Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Mobile

A Thread-Spanning Workflow

4.72/5 (7 votes)
22 Mar 2015CPOL4 min read 14.9K   294  
Easily declare workflows that can span threads.

Introduction

As part of the web server I'm writing, I needed a workflow that was capable of executing workflow functions even though those functions may span two or more threads.  Here's an example:

Image 1

A single thread listener receives a request.  The workflow here includes instrumentation (logging) and a blacklist check.  If the request passes the blacklist check, then it is queued for processing.  This allows the request listener to return to listening for connections very quickly. 

Separately, another thread waits for requests on its queue, and when received, it chooses a worker thread on which to enqueue the request.  We instrument worker thread load as part of this process.

Lastly, the worker thread itself does an authentication check, routes the request for any custom processing and then passes the data to the view engine for final rendering.

So, as you can see, this is linear workflow, but it spans three separate threads.  The implementation I present below is a solution for managing these workflows without creating too much overhead.

Why This is Cool

Through the use of a "continuation" class, we span threads safely when executing a workflow.  Furthermore, rather than, say, cloning the workflow for a specific thread, we can declare the workflow once and preserve the workflow state in the continuation instance.  This is a simple and efficient way of decoupling the work to be done from the threads doing the work.  In other words, the work to be done can be declared independently of the threads that perform the work.  Even more useful, the work process itself can determine whether the work should continue on the same thread or be deferred for processing on a different thread.

So what we achieve is a high level of abstraction with very little cost.  We can:

  1. define workflows declaratively
  2. decouple the thread from the work implementation
  3. allow the work implementation to determine how work should be continued:
    1. on the same thread
    2. deferred to another thread

The Process Signature

It'll help to keep in mind what is being called for each step of the workflow.  This must be a function with the following signature:

Func<WorkflowContinuation<T>, T, WorkflowState> doWork

The function receives a WorkflowContinuation<T> instance and a T instance and is expected to return a WorkflowState.  T is the data type being passed around from workflow function to workflow function.

WorkflowState is defined as follows:

public enum WorkflowState
{
  /// <summary>
  /// Terminate execution of the workflow.
  /// </summary>
  Abort,

  /// <summary>
  /// Continue with the execution of the workflow.
  /// </summary>
  Continue,

  /// <summary>
  /// Execution is deferred until Continue is called, usually by another thread.
  /// </summary>
  Defer,
}

An example of an actual logging workflow function would look like this:

/// <summary>
/// A workflow item, implementing a simple instrumentation of the client IP address and port.
/// </summary>
static WorkflowState LogIPAddress(
      WorkflowContinuation<HttpListenerContext> workflowContinuation, 
      HttpListenerContext context)
{
  Console.WriteLine(context.Request.RemoteEndPoint.ToString());

  return WorkflowState.Continue;
}

Workflow Item

A WorkflowItem is a lightweight container for the workflow function:

/// <summary>
/// A workflow item is a specific process to execute in the workflow.
/// </summary>
public class WorkflowItem<T>
{
  protected Func<WorkflowContinuation<T>, T, WorkflowState> doWork;

  /// <summary>
  /// Instantiate a workflow item. We take a function that takes the Workflow instance associated with this item
  /// and a data item. We expect a WorkflowState to be returned.
  /// </summary>
  /// <param name="doWork"></param>
  public WorkflowItem(Func<WorkflowContinuation<T>, T, WorkflowState> doWork)
  {
    this.doWork = doWork;
  }

  /// <summary>
  /// Execute the workflow item method.
  /// </summary>
  public WorkflowState Execute(WorkflowContinuation<T> workflowContinuation, T data)
  {
    return doWork(workflowContinuation, data);
  }
}

The Workflow Class

The Workflow class manages workflows -- the collection of functions to execute in the workflow.

/// <summary>
/// The Workflow class handles a list of workflow items that we can use to 
/// determine the processing of a request.
/// </summary>
public class Workflow<T>
{
  protected List<WorkflowItem<T>> items;

  public Workflow()
  {
    items = new List<WorkflowItem<T>>();
  }

  /// <summary>
  /// Add a workflow item.
  /// </summary>
  public void AddItem(WorkflowItem<T> item)
  {
    items.Add(item);
  }

  /// <summary>
  /// Execute the workflow from the beginning.
  /// </summary>
  public void Execute(T data)
  {
    WorkflowContinuation<T> continuation = new WorkflowContinuation<T>(this);
    InternalContinue(continuation, data);
  }

  /// <summary>
  /// Continue a deferred workflow, unless it is aborted.
  /// </summary>
  public void Continue(WorkflowContinuation<T> wc, T data)
  {
    // TODO: Throw exception instead?
    if (!wc.Abort)
    {
      wc.Defer = false;
      InternalContinue(wc, data);
    }
  }

  /// <summary>
  /// Internally, we execute workflow steps until:
  /// 1. we reach the end of the workflow chain
  /// 2. we are instructed to abort the workflow
  /// 3. we are instructed to defer execution until later.
  /// </summary>
  protected void InternalContinue(WorkflowContinuation<T> wc, T data)
  {
    while ((wc.WorkflowStep < items.Count) && !wc.Abort && !wc.Defer)
    {
      WorkflowState state = items[wc.WorkflowStep++].Execute(wc, data);

      switch (state)
      {
        case WorkflowState.Abort:
        wc.Abort = true;
        break;

        case WorkflowState.Defer:
        wc.Defer = true;
        break;
      }
    }
  }
}  

There is a nice simple elegance to this -- a workflow is started by calling the Execute method.  If a function passes the work on to another thread, it returns the Defer state.  The thread that picks up the workflow can continue the workflow by calling the Continue method.

The Workflow Continuation

The real meat though, which makes all this work, is the WorkflowContinuation class:

/// <summary>
/// Thread-specific instance that preserves the workflow continuation context for that thread.
/// </summary>
public class WorkflowContinuation<T>
{
  public int WorkflowStep { get; set; }
  public bool Abort { get; set; }
  public bool Defer { get; set; }
  public Workflow<T> Workflow { get; protected set; }

  public WorkflowContinuation(Workflow<T> workflow)
  {
    Workflow = workflow;
  }
}

Well, for being the "meat", it doesn't seem like it does much!  The point though is, that this class tracks the state of a workflow context and allows the workflow to continue when it is passed to another thread.  What this does is:

  1. We can define a single instance of a particular workflow pattern
  2. We can use that instance simultaneously because we are effectively implementing Continuation Passing Style -- we are passing in the continuation state to each workflow function.
  3. As a result, the workflow, as a process, is thread safe even though we are sharing instances amongst different threads.

The only performance penalty is that, when a workflow begins its execution, we have to create the WorkflowContinuation:

WorkflowContinuation<T> continuation = new WorkflowContinuation<T>(this);

Otherwise, there's nothing else that needs to be allocated to get all this to work. 

Some Examples

Some basic examples should help to get the idea.  Here's how a basic (not the diagram above) workflow can be declared:

static void InitializeWorkflow()
{
  workflow = new Workflow<HttpListenerContext>();
  workflow.AddItem(new WorkflowItem<HttpListenerContext>(LogIPAddress));
  workflow.AddItem(new WorkflowItem<HttpListenerContext>(handler.Process));
  workflow.AddItem(new WorkflowItem<HttpListenerContext>(CommonHandler.CommonResponse));
}

When we're ready, we can start the workflow with an instance of our data, in this case an HttpListenerContext:

workflow.Execute(context);

You already saw an example of "instrumentation":

static WorkflowState LogIPAddress(
       WorkflowContinuation<HttpListenerContext> workflowContinuation, 
       HttpListenerContext context)
{
  Console.WriteLine(context.Request.RemoteEndPoint.ToString());

  return WorkflowState.Continue;
}

Note how the workflow continuation is passed in (though we don't use it here) and note how the function returns the Continue state.

Now at some point, the data will be enqueued so that another thread can process it:

public WorkflowState Process(
       WorkflowContinuation<HttpListenerContext> workflowContinuation, 
       HttpListenerContext context)
{
  // Create a workflow context and queue it.
  requests.Enqueue(new WorkflowContext(workflowContinuation, context));
  semQueue.Release();

  return WorkflowState.Defer;
}

Notice here how the continuation is enqueued along with the request context.  Notice also that this function returns the workflow state Defer.  This indicates to the workflow engine that the continuation of this workflow is being deferred until another thread picks up the work.

Lastly, the thread that picks up the work calls the Continue function:

ts.WaitOne();
WorkflowContext context;

if (ts.TryDequeue(out context))
{
  // Continue with where we left off for this context's workflow.
  context.WorkflowContinuation.Workflow.Continue(context.WorkflowContinuation, context.Context);
}

Conclusion

That's it -- there really isn't much to this concept.  It actually seems to take longer to explain the concept than code to implement it. 

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)