Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Castle Dynamic Proxy Interceptors to Build Restartable Flows

0.00/5 (No votes)
3 Nov 2020 1  
Another way of using proxies and interceptors from Castle Dynamic Proxy framework
This article demonstrates another way of using proxies and interceptors from the Castle Dynamic Proxy framework. We implemented an interruptible and restartable flow, that is really lightweight, it is debuggable, and the flow logic is in the C# method. Also, we used Moq for unit testing and proved that for verifying and debugging your code, you don't need to spend time creating and running an application.

You can find the first article about Dynamic Proxies at the link below:

Introduction

Hello there, welcome to another story about proxies and interceptors.

Quote:

I have to thank Larry Ross for pointing out Aspect-Oriented Programming to the Pro Coders team.

If you were not amazed by my previous blog post where we discussed the model change tracking and rule execution, I will try to impress you this second time. Let's assume you have a magic technology to interrupt a .NET method, save its state, and restart from the saved point later, or maybe on another machine. You maybe think that you can do that using the Windows hibernate function, save the Windows image to a large file, copy it, and restore it on another virtual machine. But no! I am talking about a small state of about one hundred bytes, that can be saved to a database, read by another machine, and restarted from the saved point.

Are you intrigued?

Let's write a user story then.

User Story #4: Create an Interruptible and Restartable in Another AppDomain Flow

  • Logic and sequence of the flow steps should be in a C# method
  • Each step should have the ability to interrupt the flow execution
  • The state of the interrupted flow and flow code should be enough to restart the flow in another Application Domain

Implementation - Generic Flow

To start, I created a new .NET Core 3.1 project DemoCatleProxy.RestartableFlow and added Castle.Core NuGet package.

Architecturally, I think about this problem in the way that we may have many different flows and an engine that will be used to execute any one of these flows and restart them from the saved point when needed. If we think about it in terms of the business document circulation - each type of document will have a unique flow, and it will contain different steps like submission, review, approval, second approval, etc. and in different sequences of these steps.

Now I would like to define a template for flows that we are going to implement. Firstly, we need an interface that will define the flow, which we will use in the flow engine to reference the flows:

public interface IFlow
{
    object UntypedModel { get; }
    void Execute();
    internal void SetModel(object model);
}

The Execute method is a container of the flow logic and the rest declaration is for setting up our Model.

You can think of Model as a Document itself (in circulation) plus all related supporting documentation - signatures, dates of signatures, current status, next address for delivery, etc.

Each new flow we execute should have its own Model. So, my next interface is a generic template with the Model argument:

public interface IFlow<M> : IFlow
    where M : class
{
    M Model { get; }
}

Thus, each time you define a new flow for a new Document - you supply a Model argument, for example:

public class MyFlow : IFlow<MyModel>

Now to simplify future flow definitions, we create a base class that implements basic functions:

public abstract class Flow<M> : IFlow<M>
    where M : class, new()
{
    protected M _model;
    public M Model => _model;
    public object UntypedModel => _model;

    public Flow()
    {
        _model = new M();
    }

    void IFlow.SetModel(object model)
    {
        _model = model as M;
    }

    public abstract void Execute();
}

The base class allows working with the typed Model and it creates an instance of the Model in the constructor.

Eventually, we can try to define our first flow (and model) and see how it works, and I would like to do it in a unit test as usual. I created a new xUnit project DemoCatleProxy.RestartableFlow.Tests and added a project reference to the main project. Now let's add flow code:

using System;
using System.Collections.Generic;
using System.Text;
using System.Text.Json;

namespace DemoCatleProxy.RestartableFlow.Tests
{
    public class Model1 
    {
        public string ReceivedMessage { get; set; }
        public string Signature { get; set; }

        public bool IsSubmitted { get; set; }
        public bool IsLoaded { get; set; }
    }

    public interface IDemoDataService
    {
        string LoadReceivedMessage();
        bool IsMessageApproved(string message);
        string GetSignature(string message);
        bool Submit(string message, string signature);
    }

    public class DemoFlow1 : Flow<Model1>
    {
        private readonly IDemoDataService _dataService;

        public DemoFlow1()
        {
        }

        public DemoFlow1(IDemoDataService dataService)
        {
            _dataService = dataService;
        }

        public override void Execute()
        {
            LoadData();

            CheckIfApproved();

            AddDigitalSignature();

            SubmitData();
        }

        public virtual void LoadData()
        {
            if (Model.IsLoaded)
            {
                throw new FlowFatalTerminateException();
            }

            Model.ReceivedMessage = _dataService.LoadReceivedMessage();
            Model.IsLoaded = true;
        }

        public virtual void CheckIfApproved()
        {
            if (!_dataService.IsMessageApproved(Model.ReceivedMessage))
            {
                throw new FlowStopException();
            }
        }

        public virtual void AddDigitalSignature()
        {
            Model.Signature = _dataService.GetSignature(Model.ReceivedMessage);
        }

        public virtual void SubmitData()
        {
            if (!_dataService.Submit(Model.ReceivedMessage, Model.Signature))
            {
                throw new FlowStopException();
            }
        }
    }
}

I defined Model1 and DemoFlow1, and also for better realism, I added the IDemoDataService contract (interface) that will be used by the flow to communicate with the outside world.

If you look at the Execute method, you will see it is a sequence of four methods, each of them ([LoadData, CheckIfApproved, AddDigitalSignature, and SubmitData) must be virtual and I will explain why.

We will use proxy interceptors to intercept the execution of each of the used methods. When we intercept a call to the proxy object, we can make a decision of whether we should allow the call or skip it. If we restart the flow, we should be able to skip all calls until we come to the point of the previous stop. So, the methods we call from Execute must be virtual.

If you have a look at LoadData, it checks that the Model has not been loaded before, to make sure that we start the flow with a clean and fresh Model. There is an extra check for us: the method throws FlowFatalTerminateException (we will define exceptions a bit later) if the Model signals that it is loaded already - this flow will be marked as corrupted, but if the Model has not been loaded yet, we read the message from a data service and set the Model IsLoaded flag.

The next step in the sequence is the CheckIfApproved method. It asks a data service if the message is approved and if not - it throws another exception FlowStopException or continues execution. If the flow is stopped (by the FlowStopException]), the flow engine will return status IsStopped and this flow can be restarted later.

Important: When I talk about restartable flow, I mean an instance that we created once and run once can be finished successfully, or it can return status [IsStopped], then we can save this stopped instance in a database and try to restart it later.

We use exceptions in the flows because this is the most effective way of stopping execution at a particular point and the exception will be bubbled up in the call stack until somebody catches it.

Let's add the exceptions code to the main project:

using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public class FlowStopException : Exception
    {
    }

    public class FlowFatalTerminateException : Exception
    {
    }
}

We will use FlowStopException to inform the flow engine that flow stopped normally and can be restarted later, and we will use FlowFatalTerminateException to inform the engine that flow is corrupted.

Implementation - Flow Engine

The logic of the first flow run is the following ([DemoFlow1 for example):

  1. We create a new instance of DemoFlow1 class (Model is created automatically by Flow base class from which we inherited) and supply it to the flow engine.
  2. The flow engine creates a flow proxy object of the DemoFlow1 instance and a flow data - that is a full history of all changes that happened within the DemoFlow1 instance.
  3. Then the flow engine executes the flow proxy Execute method, which calls LoadData, CheckIfApproved, AddDigitalSignature, SubmitData one by one.
  4. Each call to the mentioned methods is intercepted and we save to flow data every time the method is called, and a copy of the Model obtained after the called method.
  5. If any of the called methods throws an exception, the Execute method is interrupted and the flow engine returns flow data.
  6. If the Execute method is finished without any interruption, the flow engine returns flow data with status IsFinished = true.

The logic to restart the flow is:

  1. We create a new instance of DemoFlow1 and, having flow data from the previous run, we supply the flow instance and flow data to the flow engine.
  2. The flow engine creates a flow proxy object of the DemoFlow1 instance and uses supplied flow data.
  3. Then the flow engine executes the flow proxy Execute method, which calls LoadData, CheckIfApproved, AddDigitalSignature, SubmitData one by one.
  4. Each call to the mentioned methods is intercepted and we check if the called method is already recorded in the flow data history. If yes, then we skip this call and substitute the current DemoFlow1 instance Model by the model from the flow data history, to make it the same as it was after calling of this method on the first run.
  5. If the call is not found in the flow data history, we proceed with the call and store it and the resulting Model to the history, continue the flow execution the same way as we did on the first run.

For the flow engine, we will need an interface:

using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public interface IFlowEngine
    {
        FlowData RunFlow(IFlow flow);
        FlowData RestartFlow(IFlow flow, FlowData flowData);
    }
}

and flow data class:

using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public class FlowData
    {
        public bool IsFinished { get; set; }
        public List<string> CallHistory { get; set; } = new List<string>();
        public List<object> ModelHistory { get; set; } = new List<object>();
        public bool IsStopped { get; set; }
        public Exception LastException { get; set; }
    }
}

Now we can present the flow engine:

using Castle.DynamicProxy;
using System;
using System.Collections.Generic;
using System.Text;
using System.Xml.Schema;

namespace DemoCatleProxy.RestartableFlow
{
    public class FlowEngine : IFlowEngine, IInterceptor
    {
        private readonly IProxyGenerator _proxyGenerator;
        private FlowData _flowData;
        private IFlow _flow;
        private int _counter;

        public FlowEngine(IProxyGenerator proxyGenerator)
        {
            _proxyGenerator = proxyGenerator;
        }

        public FlowData RunFlow(IFlow flow)
        {
            _flowData = new FlowData();
            return ProcessFlow(flow);
        }

        public FlowData RestartFlow(IFlow flow, FlowData flowData)
        {
            _flowData = flowData;
            return ProcessFlow(flow);
        }

        private FlowData ProcessFlow(IFlow flow)
        {
            var options = new ProxyGenerationOptions(new FreezableProxyGenerationHook(flow));
            var flowProxy = _proxyGenerator.CreateClassProxyWithTarget(flow.GetType(), 
                            flow, options, new IInterceptor[] { this }) as IFlow;
            _flow = flow;

            try
            {
                // clear previous statuses
                _counter = 0;
                _flowData.IsStopped = false;
                _flowData.LastException = null;

                // run flow
                flowProxy.Execute();
                _flowData.IsFinished = true;
            }
            catch (FlowStopException e)
            {
                _flowData.IsStopped = true;
            }
            catch (Exception e)
            {
                _flowData.LastException = e;
            }

            return _flowData;
        }

        public void Intercept(IInvocation invocation)
        {
            var method = invocation.Method.Name;
            _counter++;
            var historyRecord = $"{_counter}:{method}";

            var index = _flowData.CallHistory.IndexOf(historyRecord);

            if (index == -1)
            {
                // new call, proceed and update histories if no exceptions thrown
                invocation.Proceed();
                _flowData.CallHistory.Add(historyRecord);

                // Clone Model to store new independednt instance
                _flowData.ModelHistory.Add(_flow.UntypedModel.CloneObject());
            }
            else
            {
                // replay in vacuum: don't proceed call and substitute model for next call
                _flow.SetModel(_flowData.ModelHistory[index]);
            }
        }
    }
}

As you can see, the FlowEngine class keeps an internal state in private fields, so it cannot be declared as a singleton, we will need to create a new instance of the engine each time we need it and it cannot be shared between several threads.

If you go through the engine implementation, you will see that it does all that we described above.

The flow engine uses CloneObject extension to create a new instance of the Model object and to copy all its properties. Please add Newtonsoft.Json NuGet package and this code:

using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public static class ObjectExtension
    {
        public static T CloneObject<T>(this T source)
        {
            var jsonSerializerSettings = new JsonSerializerSettings
            {
                TypeNameAssemblyFormatHandling = TypeNameAssemblyFormatHandling.Simple,
                TypeNameHandling = TypeNameHandling.Objects
            };

            var json = JsonConvert.SerializeObject(source, jsonSerializerSettings);
            var result = JsonConvert.DeserializeObject<T>(json, jsonSerializerSettings);
            return result;
        }
    }
}

Maybe it is not the most effective way, but I didn't want to overcomplicate this article.

The last bit to be added now is a hook, which is required to be supplied to the proxy generation method options. It sets up the behavior when all methods that are called from other methods ([Execute calls LoadData]) will be executed on the proxy level (not on the original object level) so we will intercept all calls. I took this from the Castle Dynamic Proxy Tutorial. The hook code is:

using Castle.DynamicProxy;
using System;
using System.Collections.Generic;
using System.Reflection;
using System.Text;

namespace DemoCatleProxy.RestartableFlow
{
    public class FreezableProxyGenerationHook : IProxyGenerationHook
    {
        private IFlow _flow;

        public FreezableProxyGenerationHook(IFlow flow)
        {
            _flow = flow;
        }

        public override int GetHashCode()
        {
            return _flow.GetHashCode();
        }

        public override bool Equals(object obj)
        {
            return _flow == (obj as FreezableProxyGenerationHook)._flow;
        }

        public bool ShouldInterceptMethod(Type type, MethodInfo memberInfo)
        {
            return memberInfo.Name != "Execute" && memberInfo.Name != "SetModel";
        }

        public void NonVirtualMemberNotification(Type type, MemberInfo memberInfo)
        {
        }

        public void MethodsInspected()
        {
        }

        public void NonProxyableMemberNotification(Type type, MemberInfo memberInfo)
        {
        }
    }
}

Now we can compile our code and start testing it.

Implementation - Unit Test

Let's implement a unit test that will verify a simple scenario:

  1. We create a flow and run it the first time, and the demo service IsMessageApproved method returns False, so the flow should be interrupted on the CheckIfApproved method.
  2. Keeping flow data after the first run, we try to restart the flow, but at this time IsMessageApproved returns True, so we expect the flow to be finished successfully.

I added code to the DemoFlowTests.cs file:

using Castle.DynamicProxy;
using System;
using Xunit;
using Moq;

namespace DemoCatleProxy.RestartableFlow.Tests
{
    public class DemoFlowTests
    {
        [Fact]
        public void RunStopRestartFlowTest()
        {
            var flowEngine = new FlowEngine(new ProxyGenerator());

            var demoService = new Mock<IDemoDataService>();
            var flow = new DemoFlow1(demoService.Object);
            int approveTimes = 0;

            demoService.Setup(s => s.LoadReceivedMessage()).Returns("Important message 1");
            demoService.Setup(s => s.GetSignature(It.IsAny<string>())).Returns("0xAABBEFA7");
            demoService.Setup(s => s.Submit(It.IsAny<string>(), 
                                   It.IsAny<string>())).Returns(true);
           
            // the first time it returns false, the second time it returns true
            demoService.Setup(s => s.IsMessageApproved(It.IsAny<string>()))
                .Returns(() => 
                {
                    approveTimes++;
                    return approveTimes == 2; 
                });

            var flowData = flowEngine.RunFlow(flow);
            Assert.True(flowData.IsStopped);
            Assert.False(flowData.IsFinished);
            Assert.Single(flowData.ModelHistory);
            Assert.True((flowData.ModelHistory[0] as Model1)?.IsLoaded);

            // assume we saved flowData to a database and rerun the flow one day after
            var clonedFlowData = flowData.CloneObject();
            var newFlow = new DemoFlow1(demoService.Object);
            clonedFlowData = flowEngine.RestartFlow(newFlow, clonedFlowData);
            Assert.False(clonedFlowData.IsStopped);
            Assert.True(clonedFlowData.IsFinished);
        }
    }
}

When you use Dependency Injection, you can save a lot of your time and effort in unit testing. I used the Moq framework (please install Moq NuGet package) to generate a demo data service by interface IDemoDataService. The real demo data service has not been implemented yet - we don't need it for testing.

I set up all four methods, and three of them return constant values, but IsMessageApproved will return False the first time and True the next time.

My test creates and runs a flow the first time and checks the resulted flow data, it should have IsStopped = True and IsFinished = False. The flow data contains a state of the interrupted flow. This state is everything that we need in order to restart the flow.

We cloned flow data (as you remember, I serialize it to JSON and deserialize it back) to demonstrate that flow data is transferable and doesn't depend on the AppDomain.

And eventually, I call RestartFlow. Now the test expects that IsStopped = False and IsFinished = True.

If you put a breakpoint on the line with the RestartFlow call and use Step Into until you come to the Intercept method, you can see how the flow engine skipping logic works:

 

Image 1

 

I recommend playing with the debugger to see how the flow is executed the first time and how it is restarted to finish.

Summary

This article demonstrates another way of using proxies and interceptors from the Castle Dynamic Proxy framework. We implemented an interruptible and restartable flow, that conceptually is similar to the Microsoft Workflow Foundation flows but it is really lightweight, it is debuggable, and the flow logic is in the C# method. Also, we used Moq for unit testing and we have, once again, proven that for verifying and debugging your code, you don't need to spend time creating and running an application.

Save time with unit testing, and thank you for reading.

P.S.

I got some feedback on this blog from my friends and actually, they were interested in why the simple straightforward flow containing a sequence of four methods is implemented in this complex way? 

The thing is that the flow example in this blog is simple for a better understanding. In the real-life scenario, your flow will contain if conditions, goto and maybe loops and the approach will continue working even with the complex flows. This is an example flow from one of Pro Coders team projects:

public override async Task Execute()
{
    await BeginAsync();
    await PopulateData();
    await FlowTask(typeof(InsuranceClaimValidationRulesTask));
            
dataEntry:
    if (TaskExecutionValidationIssues.Any())
    {
        await UserInput(typeof(InsuranceClaimEntryForm));
    }

    if (Model.Claim.ClaimAmount.Amount > 1000)
    {
        await UserInputReview(typeof(InsuranceClaimEntryForm));

        if (Model.ClaimRejected)
        {
            await SaveRejectedClaim();
            await GenerateRejectLetter();
            goto postDocuments;
        }

        if (TaskExecutionValidationIssues.Any())
        {
            goto dataEntry;
        }
    }

    if (Model.Claim.PaymentOptionCode == "EFT")
    {
        await MakeTransfer();
    }
    else if (Model.Claim.PaymentOptionCode == "CHQ")
    {
        await PrintBankCheque();
    }
    else if (Model.Claim.PaymentOptionCode == "FUT_CONTR")
    {
        await BuyOptions();
    }
    else
    {
        Fail($"Invalid Payment Option {Model.Claim.PaymentOptionCode}", _logStreamer);
    }

    await SaveClaim();
    await GenerateSuccessLetter();

postDocuments:
    await PostProducedDocuments();
    await EndAsync();
}

As you can see this flow engine also supports asynchronous operations async-await and users input in dynamic forms, but the principle is the same - the flow can be stopped or even fail at some point and restarted later in a day or in a month and on another machine.

History

  • 31st October, 2020: Initial version

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here