In this post we will look at 2 ways you can write state machines with Akka. We will firstly examine the more primitive (but easily understandable) approach, and then look into the more sophisticated approach offered by AkkaFSM.
What Is A State Machine?
For those of you out there that do not know what a state machine is.
This is what Wikipedia says about them
A finite-state machine (FSM) or finite-state automaton (FSA, plural: automata), or simply a state machine, is a mathematical model of computation used to design both computer programs and sequential logic circuits. It is conceived as an abstract machine that can be in one of a finite number of states. The machine is in only one state at a time; the state it is in at any given time is called the current state. It can change from one state to another when initiated by a triggering event or condition; this is called a transition. A particular FSM is defined by a list of its states, and the triggering condition for each transition.
https://en.wikipedia.org/wiki/Finite-state_machine
This could be an example state machine for a coin operated barrier
Akka supports swapping the standard message loop using become which is available via the context where this is the standard signature of receive (the message loop)
PartialFunction[Any, Unit]
The newly applied message loops are maintained in a stack and may be pushed popped
Become
import akka.actor.Actor
class HotColdStateActor extends Actor {
import context._
def cold: Receive = {
case "snow" => println("I am already cold!")
case "sun" => becomeHot
}
def hot: Receive = {
case "sun" => println("I am already hot!")
case "snow" => becomeCold
}
def receive = {
case "snow" => becomeCold
case "sun" => becomeHot
}
private def becomeCold: Unit = {
println("becoming cold")
become(cold)
}
private def becomeHot: Unit = {
println("becoming hot")
become(hot)
}
}
With this example we simply use become to push a new message loop, the latest become code is the current message loop.
If we use the following demo code against this actor code
import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
object Demo extends App {
RunBecomeUnbecomeStateDemo
def RunHotColdStateDemo : Unit = {
val system = ActorSystem("StateMachineSystem")
val hotColdStateActor =
system.actorOf(Props(classOf[HotColdStateActor]),
"demo-HotColdStateActor")
println("sending sun")
hotColdStateActor ! "sun"
Thread.sleep(1000)
println("sending sun")
hotColdStateActor ! "sun"
Thread.sleep(1000)
println("sending snow")
hotColdStateActor ! "snow"
Thread.sleep(1000)
println("sending snow")
hotColdStateActor ! "snow"
Thread.sleep(1000)
println("sending sun")
hotColdStateActor ! "sun"
Thread.sleep(1000)
println("sending snow")
hotColdStateActor ! "snow"
Thread.sleep(1000)
StdIn.readLine()
system.terminate()
StdIn.readLine()
}
}
We would see output like this
UnBecome
The other way to swap out the message loop relies on having matching pairs of become/unbecome. Where the standard message loop is not replaced as such, but will use the last value on a stack of values as the message loop.
Care must taken to ensure the amount of push (become) and pop (unbecome) operations match, otherwise memory leaks may occur. Which is why this is not the default behavior.
Here is an example actor that uses the become/unbecome matched operations.
import akka.actor.Actor
class BecomeUnbecomeStateActor extends Actor {
import context._
def receive = {
case "snow" =>
println("saw snow, becoming")
become({
case "sun" =>
println("saw sun, unbecoming")
unbecome()
case _ => println("Unknown message, state only likes sun")
}, discardOld = false)
case _ => println("Unknown message, state only likes snow")
}
}
I personally think this is harder to read, and manage.
Here is some demo code to exercise this actor:
import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
object Demo extends App {
RunBecomeUnbecomeStateDemo
def RunBecomeUnbecomeStateDemo : Unit = {
val system = ActorSystem("StateMachineSystem")
val becomeUnbecomeStateActor =
system.actorOf(Props(classOf[BecomeUnbecomeStateActor]),
"demo-BecomeUnbecomeStateActor")
println("Sending snow")
becomeUnbecomeStateActor ! "snow"
Thread.sleep(1000)
println("Sending snow")
becomeUnbecomeStateActor ! "snow"
Thread.sleep(1000)
println("Sending sun")
becomeUnbecomeStateActor ! "sun"
Thread.sleep(1000)
println("Sending sun")
becomeUnbecomeStateActor ! "sun"
Thread.sleep(1000)
println("Sending snow")
becomeUnbecomeStateActor ! "snow"
Thread.sleep(1000)
StdIn.readLine()
system.terminate()
StdIn.readLine()
}
}
Which when run should yield the following results
AkkaFSM
While become/unbecome will get the job done. Akka comes with a much better alternative called Akka FSM
Using Akka FSM you can not only handle states but also have state data, and add code against the movement from one state to the next, which as most people will know are known as “transitions”.
When using Akka FSM you may see a state machine expressed using this sort of notation
State(S) x Event(E) -> Actions (A), State(S’)
f we are in state S and the event E occurs, we should perform the actions A and make a transition to the state S’.
To use Akka FSM there are a number of things you can do. Some of them are mandatory and some you can opt into depending on your requirements. Lets have a look at some of the moving parts that make Akka FSM shine.
FSM[S,D]
In order to use Akka FSM you need to mixin the FSM trait. The trait itself looks like this
trait FSM[S, D]
Where S is the state type, and D is the data type.
startWith
You can choose what state the FSM starts in by using the startWith method which has the following signature.
startWith(stateName: S, stateData: D, timeout: Timeout = None): Unit
initialize
Calling initialilze() performs the transition into the initial state and sets up timers (if required).
when
when is used to match the state, and is also used to control the movement to a new state using the inbuilt goto method, or possibly stay in the current state.
When uses pattern matching to match the events that a particular state can handle. As stated it is completely valid to stay in the current state or move to a new state
The examples that follow below will show you both stay and goto in action.
whenUnhandled
You should obviously try and make sure you cover all the correct state movements in response to all the events your FSM knows about. But Akka FSM also comes with the whenUnhandled method for catching events that were not handled by YOUR state handling (when) logic.
onTransition
You may also monitor the movement from one state to the next and run some code when this occurs. This is accomplished using the onTransition method.
onTransition has the following signature
onTransition(transitionHandler: TransitionHandler): Unit
Where TransitionHandler is really just a PartialFunction that has the following generic parameters
PartialFunction[(S, S), Unit]
Where the tuple is a tuple of “from state” to “to state”.
Time for an example
Lightswitch Example
This first example is a very simply FSM, that has 2 states, On and Off. It doesn’t really need any state, however the Akka FSM trait, always needs a Data object. So in this case we simple use a base trait for the Data which we don’t really care about.
The idea behind this example is that the lightswitch can move from Off –> On, and On –> Off.
This example also shows a stateTimeout in action, where by the On state will move from On, if left in that state for more than 1 second.
Here is the full code for this example
import akka.actor.{Actor, ActorRef, FSM}
import scala.concurrent.duration._
import scala.collection._
final case class PowerOn()
final case class PowerOff()
sealed trait LightSwitchState
case object On extends LightSwitchState
case object Off extends LightSwitchState
sealed trait LightSwitchData
case object NoData extends LightSwitchData
class LightSwitchActor extends FSM[LightSwitchState, LightSwitchData] {
startWith(Off, NoData)
when(Off) {
case Event(PowerOn, _) =>
goto(On) using NoData
}
when(On, stateTimeout = 1 second) {
case Event(PowerOff, _) =>
goto(Off) using NoData
case Event(StateTimeout, _) =>
println("'On' state timed out, moving to 'Off'")
goto(Off) using NoData
}
whenUnhandled {
case Event(e, s) =>
log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
goto(Off) using NoData
}
onTransition {
case Off -> On => println("Moved from Off to On")
case On -> Off => println("Moved from On to Off")
}
initialize()
}
Which we can run using this demo code.
import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
object Demo extends App {
RunLightSwitchDemo
def RunLightSwitchDemo : Unit = {
val system = ActorSystem("StateMachineSystem")
val lightSwitchActor =
system.actorOf(Props(classOf[LightSwitchActor]),
"demo-LightSwitch")
println("sending PowerOff, should be off already")
lightSwitchActor ! PowerOff
Thread.sleep(500)
println("sending PowerOn")
lightSwitchActor ! PowerOn
Thread.sleep(500)
println("sending PowerOff")
lightSwitchActor ! PowerOff
Thread.sleep(500)
println("sending PowerOn")
lightSwitchActor ! PowerOn
Thread.sleep(500)
println("sleep for a while to allow 'On' state to timeout")
Thread.sleep(2000)
StdIn.readLine()
system.terminate()
StdIn.readLine()
}
}
When run we get the following results
sending PowerOff, should be off already
[WARN] [09/06/2016 07:21:28.864] [StateMachineSystem-akka.actor.default-dispatcher-4]
[akka://StateMachineSystem/user/demo-LightSwitch] received unhandled request PowerOff in state Off/NoData
sending PowerOn
Moved from Off to On
sending PowerOff
Moved from On to Off
sending PowerOn
Moved from Off to On
sleep for a while to allow ‘On’ state to timeout
‘On’ state timed out, moving to ‘Off’
Moved from On to Off
There is something interesting here, which is that we see an Unhandled event. Why is this?
Well this is due to the fact that this demo FSM starts in the Off state, and we send a PowerOff event. This is not handled in the when for the Off state.
We could fix this, by amending this as follows:
when(Off) {
case Event(PowerOn, _) =>
goto(On) using NoData
case Event(PowerOff, _) =>
println("already off")
stay
}
So if we apply this amended code, and run the demo again. We would now see this output instead
sending PowerOff, should be off already
already off
sending PowerOn
Moved from Off to On
sending PowerOff
Moved from On to Off
sending PowerOn
Moved from Off to On
sleep for a while to allow ‘On’ state to timeout
‘On’ state timed out, moving to ‘Off’
Moved from On to Off
Buncher Example
I have basically taken this one straight from the official Akka FSM docs.
This example shall receive and queue messages while they arrive in a burst and send them on to another target actor after the burst ended or a flush request is received.
Here is the full code for this example. It is a slightly fuller example, so this time we use a proper set of data objects for the states.
import akka.actor.{Actor, ActorRef, FSM}
import scala.concurrent.duration._
import scala.collection._
final case class SetTarget(ref: ActorRef)
final case class Queue(obj: Any)
case object Flush
final case class Batch(obj: immutable.Seq[Any])
sealed trait State
case object Idle extends State
case object Active extends State
sealed trait BuncherData
case object Uninitialized extends BuncherData
final case class Todo(target: ActorRef, queue: immutable.Seq[Any]) extends BuncherData
class BuncherActor extends FSM[State, BuncherData] {
startWith(Idle, Uninitialized)
when(Idle) {
case Event(SetTarget(ref), Uninitialized) =>
stay using Todo(ref, Vector.empty)
}
when(Active, stateTimeout = 1 second) {
case Event(Flush | StateTimeout, t: Todo) =>
goto(Idle) using t.copy(queue = Vector.empty)
}
whenUnhandled {
case Event(Queue(obj), t @ Todo(_, v)) =>
goto(Active) using t.copy(queue = v :+ obj)
case Event(e, s) =>
log.warning("received unhandled request {} in state {}/{}", e, stateName, s)
stay
}
onTransition {
case Active -> Idle =>
stateData match {
case Todo(ref, queue) => ref ! Batch(queue)
case _ =>
}
}
initialize()
}
class BunchReceivingActor extends Actor {
def receive = {
case Batch(theBatchData) => {
println(s"receiving the batch data $theBatchData")
}
case _ => println("unknown message")
}
}
Which we can run using this demo code
import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
object Demo extends App {
RunBuncherDemo
def RunBuncherDemo : Unit = {
val system = ActorSystem("StateMachineSystem")
val buncherActor =
system.actorOf(Props(classOf[BuncherActor]),
"demo-Buncher")
val bunchReceivingActor =
system.actorOf(Props(classOf[BunchReceivingActor]),
"demo-BunchReceiving")
buncherActor ! SetTarget(bunchReceivingActor)
println("sending Queue(42)")
buncherActor ! Queue(42)
println("sending Queue(43)")
buncherActor ! Queue(43)
println("sending Queue(44)")
buncherActor ! Queue(44)
println("sending Flush")
buncherActor ! Flush
println("sending Queue(45)")
buncherActor ! Queue(45)
StdIn.readLine()
system.terminate()
StdIn.readLine()
}
}
When this demo runs you should see something like this
sending Queue(42)
sending Queue(43)
sending Queue(44)
sending Flush
sending Queue(45)
receiving the batch data Vector(42, 43, 44)
receiving the batch data Vector(45)
Where Can I Find The Code Examples?
I will be augmenting this GitHub repo with the example projects as I move through this series
https://github.com/sachabarber/SachaBarber.AkkaExamples