Introduction
The goal of the article is to show how to utilize async/await feature which comes with .NET 4.5, to easily implement finite-state machine pattern.
Background
According to Wikipedia:
Finite-state machine is conceived as an abstract machine that can be in one of a finite number of states. The machine is in only one state at a time.
The pattern could be very useful if you are dealing, for example, with complex interactions between different parts of a system.
The most obvious example of FSM usage is a client-server collaboration, so I’ll stick to this example in the article.
The problem
Let’s say we have to implement a master-worker computation system. User schedules a job to the master, the master splits the job into tasks, then sends them to a group of workers for execution. Each worker executes the task and sends a result back to the master.
Provided that both the master and the worker are finite-state machines, we could define their states.
Master’s states:
- Waiting for a job
- Planning a job (splitting a job into multiple tasks)
- Scheduling tasks (sending a portion of tasks to a selected worker)
- Waiting for results
Worker’s states:
- Waiting for a task
- Executing a task
- Sending a result
So the workflow for the master would be:
Worker’s states and transitions are too trivial, so I omit them.
The diagram clearly describes what’s going on with the master.
Let’s see how we’d implement that in C#.
Typical implementation
A finite-state machine consists of states and transitions. Each transition is activated when a certain event occurs (e.g., the worker is connected or task result is received) and the current state is valid (e.g., a number of tasks being executed is equal to 0).
Thus, the goal is to write the described workflow (the diagram) in C#.
For the sake of simplicity, let us assume that we have two classes: Master
and Worker
.
Worker
class isn't that interesting at the moment, so I omit its implementation details and focus at Master
.
The code must be thread safe due to event-driven design (each event handler could be called from different threads). We could use either locks or rely on thread safe collections/primitives or use a synchronization context. I prefer the third option because it keeps a code clean and simple.
class Master {
public SynchronizationContext SynchronizationContext { get; set; }
}
All event handlers must be invoked in the specified synchronization context, hence we no longer care about race conditions.
In a real world application, it is a good practice to create facade-like event handlers to ensure that the handler was invoked in the current synchronization context:
public class OnEventOccured(...) {
this.SynchronizationContext.Post(o => this.OnEventOccuredInternal(...), null);
}
Besides, we would like to have the ability to test some key behaviors of the master (e.g., it should wait for worker to get connected or it should schedule tasks among all connected workers).
Master
class has an internal state and public event handlers.
First, we explicitly define our states:
public enum MasterState {
WaitingForJob,
PlanningJob,
SchedulingTasks,
WaitingForResults
}
Then we define event handlers. Each event handler consists of:
- Validation logic (is it OK that the event handler was invoked with the current
Master
state?)
- Transition validation (can it change the current state?)
- State change (transition from a state to another)
public void OnPlanningJob() {
if (this.State != MasterState.WaitingForJob)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.PlanningJob;
this.Tasks = this.CurrentJob.Lines.Select(line => new JobTask {Text = line}).ToList();
this.OnSchedulingTasks();
}
private void OnSchedulingTasks() {
if (this.State != MasterState.PlanningJob)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.SchedulingTasks;
this.ScheduleTasks();
}
private void OnWaitingForResults() {
if (this.State != MasterState.SchedulingTasks)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.WaitingForResults;
}
public void OnTaskResultReceived(TaskResult result) {
if (this.State != MasterState.WaitingForResults)
throw new InvalidOperationException("Invalid state");
this.Results.Add(result);
this.Tasks.RemoveAll(task => task.Id == result.TaskId);
if (this.Tasks.Count > 0)
return;
this.CurrentJob.Result = this.Results.Sum(x => x.Data);
this.JobFinished(this.CurrentJob);
this.OnWaitingForJob();
}
private void OnWaitingForJob() {
this.Results.Clear();
this.State = MasterState.WaitingForJob;
if (this.JobQueue.Any()) {
this.CurrentJob = this.JobQueue.Dequeue();
this.OnPlanningJob();
}
}
public void OnWorkerConnected(Guid workerId) {
this.Workers.Add(workerId);
if (this.State == MasterState.SchedulingTasks)
this.ScheduleTasks();
}
public void OnWorkerDisconnected(Guid workerId) {
this.Workers.Remove(workerId);
}
The code works as expected but there is a problem: it is hard to trace the workflow among these event handlers.
The workflow is distributed throughout the class hence it is hard to make changes in the code. To demonstrate the statement, let's add timeout to the "waiting for worker" operation.
First, we add a new state called WaitingForWorker
:
internal enum MasterState {
WaitingForJob,
PlanningJob,
WaitingForWorker,
SchedulingTasks,
WaitingForResults
}
Then we add a timer:
Timer workerTimeoutTimer =
new Timer(this.CheckWorkerTimeout, null, Timeout.Infinite, Timeout.Infinite);
A new transition for the state is:
public void OnWaitingForWorker() {
if (this.State != MasterState.PlanningJob)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.WaitingForWorker;
if (this.Workers.Count == 0)
this.workerTimeoutTimer.Change(this.WorkerTimeout, Timeout.InfiniteTimeSpan);
else
this.OnSchedulingTasks();
}
Callback for the timer is:
private void CheckWorkerTimeout(object parameter) {
this.SynchronizationContext.Post(o => {
if (this.State != MasterState.WaitingForWorker)
return;
this.WorkerTimedout();
this.OnWaitingForJob();
}, null);
}
Then we modify other event handlers to alter the workflow:
public void OnPlanningJob() {
if (this.State != MasterState.WaitingForJob)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.PlanningJob;
this.Tasks = this.CurrentJob.Lines.Select(line => new JobTask {Text = line}).ToList();
this.OnWaitingForWorker();
}
public void OnSchedulingTasks() {
if (this.State != MasterState.WaitingForWorker)
throw new InvalidOperationException("Invalid transition");
this.State = MasterState.SchedulingTasks;
this.ScheduleTasks();
}
We had to modify two methods and add an extra one to alter the workflow. Each new requirement would cause increasing of code complexity. Eventually the code become hard to maintain and modify.
Let us consider an alternative way to implement a finite-state machine.
Async/await implementation
In .NET 4.5 (and C# 5.0) Microsoft introduced a new way to handle asynchronous method chains using async
and await keywords.
Basically, when you create an asynchronous method, the compiler transforms it to a finite-state machine similar to one created by the yield
keyword. For further details, please check out this article.
Let’s take advantage of this shiny new feature and rewrite our previous code:
private async void ExecuteJobs() {
SynchronizationContext.SetSynchronizationContext(this.SynchronizationContext);
var cancellationToken = this.cancellationTokenSource.Token;
while (cancellationToken.IsCancellationRequested == false) {
var job = await this.GetJobAsync(cancellationToken);
var tasks = job.Lines.Select(line => new JobTask {Text = line}).ToList();
var results = new List<TaskResult>();
var workers = await this.GetWorkersAsync(cancellationToken);
var i = 0;
foreach (var jobTask in tasks)
this.ScheduleTask(jobTask, workers[i++%workers.Count]);
while (cancellationToken.IsCancellationRequested == false) {
var result = await this.ReceiveTaskResultAsync(cancellationToken);
tasks.RemoveAll(task => task.Id == result.TaskId);
results.Add(result);
if (tasks.Count == 0)
break;
}
job.Result = results.Sum(x => x.Data);
this.JobFinished(job);
}
}
That's it. The whole workflow fits into one simple method! All the other methods are completely utilitarian:
private static Task<TResult> WaitFor<TResult>(
Action<Action<TResult>> subscribe,
Action<Action<TResult>> unsubscribe,
CancellationToken cancellationToken) {
var source = new TaskCompletionSource<TResult>(cancellationToken);
cancellationToken.Register(OnCancelled<TResult>, source);
Action<TResult> handler = null;
handler = result => {
unsubscribe(handler);
source.TrySetResult(result);
};
subscribe(handler);
return source.Task;
}
private Task<Job> ReceiveJobAsync(CancellationToken cancellationToken) {
return WaitFor<Job>(
handler => this.JobReceived += handler,
handler => this.JobReceived -= handler,
cancellationToken);
}
private Task<TaskResult> ReceiveTaskResultAsync(CancellationToken cancellationToken) {
return WaitFor<TaskResult>(
hanlder => this.TaskResultReceived += hanlder,
handler => this.TaskResultReceived -= handler,
cancellationToken);
}
private async Task<IReadOnlyList<Guid>> GetWorkersAsync(CancellationToken cancellationToken) {
if (this.connectedWorkers.Count == 0)
return new[] {await this.WaitForWorkerToConnect(cancellationToken)};
return this.connectedWorkers;
}
private Task<Guid> WaitForWorkerToConnect(CancellationToken cancellationToken) {
return WaitFor<Guid>(
hanlder => this.WorkerConnected += hanlder,
handler => this.WorkerConnected -= handler,
cancellationToken);
}
private static void OnCancelled<TResult>(object parameter) {
var source = (TaskCompletionSource<TResult>) parameter;
source.TrySetCanceled();
}
private async Task<Job> GetJobAsync(CancellationToken cancellationToken) {
while (this.jobQueue.Count == 0)
await this.ReceiveJobAsync(cancellationToken);
return this.jobQueue.Dequeue();
}
public void OnJobReceived(Job job) {
this.jobQueue.Enqueue(job);
this.JobReceived(job);
}
public void OnTaskResultReceived(TaskResult result) {
this.TaskResultReceived(result);
}
public void OnWorkerConnected(Guid worker) {
this.connectedWorkers.Add(worker);
this.WorkerConnected(worker);
}
private void OnWorkerDisconnected(Guid worker) {
this.connectedWorkers.Remove(worker);
this.WorkerDisconnected(worker);
}
private event Action<Job> JobReceived = _ => { };
private event Action<Guid> WorkerConnected = _ => { };
private event Action<Guid> WorkerDisconnected = _ => { };
private event Action<TaskResult> TaskResultReceived = _ => { };
All the states are implicit, which means that you could easily modify the workflow.
Let's see how it handles alteration of the workflow by adding timeout functionality from the first example:
private async void ExecuteJobs() {
SynchronizationContext.SetSynchronizationContext(this.SynchronizationContext);
var cancellationToken = this.cancellationTokenSource.Token;
while (cancellationToken.IsCancellationRequested == false) {
var job = await this.GetJobAsync(cancellationToken);
var tasks = job.Lines.Select(line => new JobTask {Text = line}).ToList();
var results = new List<TaskResult>();
var workers = default (IReadOnlyList<Guid>);
var localTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
localTokenSource.CancelAfter(this.WorkerTimeout);
try {
workers = await this.GetWorkersAsync(localTokenSource.Token);
} catch (OperationCanceledException) {
this.WorkerTimedout();
continue;
}
var i = 0;
foreach (var jobTask in tasks)
this.ScheduleTask(jobTask, workers[i++%workers.Count]);
while (cancellationToken.IsCancellationRequested == false) {
var result = await this.ReceiveTaskResultAsync(cancellationToken);
tasks.RemoveAll(task => task.Id == result.TaskId);
results.Add(result);
if (tasks.Count == 0)
break;
}
job.Result = results.Sum(x => x.Data);
this.JobFinished(job);
}
}
All modifications are in one method only, and we still can trace the workflow with naked eyes.
Now let's see how we could test this code.
Tests
Method ExecuteJobs
consists of multiple "parts" which are executed at different times in the synchronization context.
A typical test would be consisted of three parts:
- Act (interact with
Master
using event handlers)
- React (let the gears spin)
- Verify (assert the result)
The first thing we need is to obtain control over the execution flow of these "parts". To do this, we have to implement another version of the synchronization context:
public class TestSynchronizationContext : SynchronizationContext {
private readonly Queue<KeyValuePair<SendOrPostCallback, object>> queue =
new Queue<KeyValuePair<SendOrPostCallback, object>>();
public override void Post(SendOrPostCallback d, object state) {
if (d == null)
throw new ArgumentNullException("d");
this.queue.Enqueue(new KeyValuePair<SendOrPostCallback, object>(d, state));
}
public override void Send(SendOrPostCallback d, object state) {
throw new NotSupportedException("Synchronously sending is not supported.");
}
public bool RunOperations() {
if (this.queue.Count == 0)
return false;
while (this.queue.Count > 0) {
var item = this.queue.Dequeue();
item.Key(item.Value);
}
return true;
}
}
Method RunOperations
runs all the pending "parts" of ExecuteJobs
method. So after RunOperations
was executed we can definitely say that all transitions are completed and Master
is ready for interaction.
Here is some complete tests:
public class Tests {
private readonly MasterAsyncAwait master;
private readonly List<Tuple<JobTask, Guid>> scheduledTasks =
new List<Tuple<JobTask, Guid>>();
private readonly TestSynchronizationContext syncContext = new TestSynchronizationContext();
public Tests() {
SynchronizationContext.SetSynchronizationContext(this.syncContext);
this.master = new MasterAsyncAwait {
SynchronizationContext = this.syncContext,
ScheduleTask = (task, workerId) =>
this.scheduledTasks.Add(Tuple.Create(task, workerId))
};
}
[Fact]
public void SchedulesJob() {
this.master.Start();
var workerId = new Guid("1eb02478-cc31-4759-97b4-5d459d802e73");
this.master.OnJobReceived(new Job {Lines = new[] {"test", "ok"}});
this.master.OnWorkerConnected(workerId);
this.syncContext.RunOperations();
Assert.NotEmpty(scheduledTasks);
Assert.True(scheduledTasks.All(task => task.Item2 == workerId));
}
[Fact]
public void SplitsTasksBetweenWorkersFairly() {
this.master.Start();
var workerId1 = new Guid("1eb02478-cc31-4759-97b4-5d459d802e73");
var workerId2 = new Guid("ffe9c75e-9b41-46fe-b75d-a8bb2167d43c");
this.master.OnJobReceived(new Job {Lines = new[] {"test", "ok"}});
this.master.OnWorkerConnected(workerId1);
this.master.OnWorkerConnected(workerId2);
this.syncContext.RunOperations();
Assert.NotEmpty(scheduledTasks);
Assert.True(scheduledTasks.Any(task => task.Item2 == workerId1));
Assert.True(scheduledTasks.Any(task => task.Item2 == workerId2));
}
[Fact]
public void ComputesJobResult() {
var result = 0;
this.master.JobFinished = job => result = job.Result;
this.master.Start();
this.master.OnJobReceived(new Job {Lines = new[] {"test"}});
this.master.OnWorkerConnected(new Guid("1eb02478-cc31-4759-97b4-5d459d802e73"));
this.syncContext.RunOperations();
this.master.OnTaskResultReceived(new TaskResult {
TaskId = this.scheduledTasks.Single().Item1.Id,
Data = 4
});
this.syncContext.RunOperations();
Assert.Equal(4, result);
}
}
As you can see, we can't explicitly assert the current state, because it is implicit, and the only way to see what`s going on, is to check the output from the Master
. So each test is a scenario (spec) which you play and assert the output at the end.
You could also expose some information about what`s going on inside:
public MasterStatus Status { get; private set; }
private async void ExecuteJobs() {
...
this.Status = MasterStatus.WaitingForWorker;
...
}
public IReadOnlyCollection<Guid> ConnectedWorkers { get; private set; }
Summary
Each approach has its own pros and cons.
The typical implementation is hard to maintain, but you can verify each state explicitly and you don`t have to replay a complete scenario. Instead you could just set the current state, act, and make sure that the transition occurs and the new state is valid.
The async/await implementation gives you the simplicity and flexibility. However, you can't treat the class as FSM in unit tests, all you have is a bunch of interaction methods and an output. Use it wisely.