Introduction
As part of the web server I'm writing, I needed a workflow that was capable of executing workflow functions even though those functions may span two or more threads. Here's an example:
A single thread listener receives a request. The workflow here includes instrumentation (logging) and a blacklist check. If the request passes the blacklist check, then it is queued for processing. This allows the request listener to return to listening for connections very quickly.
Separately, another thread waits for requests on its queue, and when received, it chooses a worker thread on which to enqueue the request. We instrument worker thread load as part of this process.
Lastly, the worker thread itself does an authentication check, routes the request for any custom processing and then passes the data to the view engine for final rendering.
So, as you can see, this is linear workflow, but it spans three separate threads. The implementation I present below is a solution for managing these workflows without creating too much overhead.
Why This is Cool
Through the use of a "continuation" class, we span threads safely when executing a workflow. Furthermore, rather than, say, cloning the workflow for a specific thread, we can declare the workflow once and preserve the workflow state in the continuation instance. This is a simple and efficient way of decoupling the work to be done from the threads doing the work. In other words, the work to be done can be declared independently of the threads that perform the work. Even more useful, the work process itself can determine whether the work should continue on the same thread or be deferred for processing on a different thread.
So what we achieve is a high level of abstraction with very little cost. We can:
- define workflows declaratively
- decouple the thread from the work implementation
- allow the work implementation to determine how work should be continued:
- on the same thread
- deferred to another thread
The Process Signature
It'll help to keep in mind what is being called for each step of the workflow. This must be a function with the following signature:
Func<WorkflowContinuation<T>, T, WorkflowState> doWork
The function receives a WorkflowContinuation<T>
instance and a T
instance and is expected to return a WorkflowState
. T is the data type being passed around from workflow function to workflow function.
WorkflowState
is defined as follows:
public enum WorkflowState
{
Abort,
Continue,
Defer,
}
An example of an actual logging workflow function would look like this:
static WorkflowState LogIPAddress(
WorkflowContinuation<HttpListenerContext> workflowContinuation,
HttpListenerContext context)
{
Console.WriteLine(context.Request.RemoteEndPoint.ToString());
return WorkflowState.Continue;
}
Workflow Item
A WorkflowItem is a lightweight container for the workflow function:
public class WorkflowItem<T>
{
protected Func<WorkflowContinuation<T>, T, WorkflowState> doWork;
public WorkflowItem(Func<WorkflowContinuation<T>, T, WorkflowState> doWork)
{
this.doWork = doWork;
}
public WorkflowState Execute(WorkflowContinuation<T> workflowContinuation, T data)
{
return doWork(workflowContinuation, data);
}
}
The Workflow Class
The Workflow
class manages workflows -- the collection of functions to execute in the workflow.
public class Workflow<T>
{
protected List<WorkflowItem<T>> items;
public Workflow()
{
items = new List<WorkflowItem<T>>();
}
public void AddItem(WorkflowItem<T> item)
{
items.Add(item);
}
public void Execute(T data)
{
WorkflowContinuation<T> continuation = new WorkflowContinuation<T>(this);
InternalContinue(continuation, data);
}
public void Continue(WorkflowContinuation<T> wc, T data)
{
if (!wc.Abort)
{
wc.Defer = false;
InternalContinue(wc, data);
}
}
protected void InternalContinue(WorkflowContinuation<T> wc, T data)
{
while ((wc.WorkflowStep < items.Count) && !wc.Abort && !wc.Defer)
{
WorkflowState state = items[wc.WorkflowStep++].Execute(wc, data);
switch (state)
{
case WorkflowState.Abort:
wc.Abort = true;
break;
case WorkflowState.Defer:
wc.Defer = true;
break;
}
}
}
}
There is a nice simple elegance to this -- a workflow is started by calling the Execute
method. If a function passes the work on to another thread, it returns the Defer state. The thread that picks up the workflow can continue the workflow by calling the Continue
method.
The Workflow Continuation
The real meat though, which makes all this work, is the WorkflowContinuation
class:
public class WorkflowContinuation<T>
{
public int WorkflowStep { get; set; }
public bool Abort { get; set; }
public bool Defer { get; set; }
public Workflow<T> Workflow { get; protected set; }
public WorkflowContinuation(Workflow<T> workflow)
{
Workflow = workflow;
}
}
Well, for being the "meat", it doesn't seem like it does much! The point though is, that this class tracks the state of a workflow context and allows the workflow to continue when it is passed to another thread. What this does is:
- We can define a single instance of a particular workflow pattern
- We can use that instance simultaneously because we are effectively implementing Continuation Passing Style -- we are passing in the continuation state to each workflow function.
- As a result, the workflow, as a process, is thread safe even though we are sharing instances amongst different threads.
The only performance penalty is that, when a workflow begins its execution, we have to create the WorkflowContinuation
:
WorkflowContinuation<T> continuation = new WorkflowContinuation<T>(this);
Otherwise, there's nothing else that needs to be allocated to get all this to work.
Some Examples
Some basic examples should help to get the idea. Here's how a basic (not the diagram above) workflow can be declared:
static void InitializeWorkflow()
{
workflow = new Workflow<HttpListenerContext>();
workflow.AddItem(new WorkflowItem<HttpListenerContext>(LogIPAddress));
workflow.AddItem(new WorkflowItem<HttpListenerContext>(handler.Process));
workflow.AddItem(new WorkflowItem<HttpListenerContext>(CommonHandler.CommonResponse));
}
When we're ready, we can start the workflow with an instance of our data, in this case an HttpListenerContext:
workflow.Execute(context);
You already saw an example of "instrumentation":
static WorkflowState LogIPAddress(
WorkflowContinuation<HttpListenerContext> workflowContinuation,
HttpListenerContext context)
{
Console.WriteLine(context.Request.RemoteEndPoint.ToString());
return WorkflowState.Continue;
}
Note how the workflow continuation is passed in (though we don't use it here) and note how the function returns the Continue
state.
Now at some point, the data will be enqueued so that another thread can process it:
public WorkflowState Process(
WorkflowContinuation<HttpListenerContext> workflowContinuation,
HttpListenerContext context)
{
requests.Enqueue(new WorkflowContext(workflowContinuation, context));
semQueue.Release();
return WorkflowState.Defer;
}
Notice here how the continuation is enqueued along with the request context. Notice also that this function returns the workflow state Defer
. This indicates to the workflow engine that the continuation of this workflow is being deferred until another thread picks up the work.
Lastly, the thread that picks up the work calls the Continue
function:
ts.WaitOne();
WorkflowContext context;
if (ts.TryDequeue(out context))
{
context.WorkflowContinuation.Workflow.Continue(context.WorkflowContinuation, context.Context);
}
Conclusion
That's it -- there really isn't much to this concept. It actually seems to take longer to explain the concept than code to implement it.