Last time we looked at Akka Clustering, this time we will look at routing.
Routing allows messages to be routed to one or more actors known as routees, by sending the messages to a router that will know how to route the messages to the routees.
Akka comes with quite a few inbuilt routing strategies that we can make use of. We will look at these next.
Types Of Routing Strategy
Akka comes with a whole bunch of inbuilt routing strategies such as:
RoundRobin
: Routes in a round-robin fashion to its routees. Random
: This router type selects one of its routees randomly for each message. SmallestMailBox
: A Router that tries to send to the non-suspended child routee with fewest messages in mailbox. The selection is done in this order: pick any idle routee (not processing message) with empty mailbox, pick any routee with empty mailbox pick routee with fewest pending messages in mailbox pick any remote routee, remote actors are consider lowest priority, since their mailbox size is unknown Broadcast
: A broadcast router forwards the message it receives to all its routees. ScatterGatherFirstCompleted
: The ScatterGatherFirstCompletedRouter
will send the message on to all its routees. It then waits for first reply it gets back. This result will be sent back to original sender. Other replies are discarded. TailChopping
: The TailChoppingRouter
will first send the message to one, randomly picked, routee and then, after a small delay, to a second routee (picked randomly from the remaining routees) and so on. It waits for the first reply it gets back and forwards it back to original sender. Other replies are discarded.
The goal of this router is to decrease latency by performing redundant queries to multiple routees, assuming that one of the other actors may still be faster to respond than the initial one.
Regular Actor As A Router
Akka allows you to create routers in 2 ways, the first way is to use RoutingLogic
to setup your router.
There are quite a few specializations of the RoutingLogic
, such as:
RoundRobinRoutingLogic
RandomRoutingLogic
SmallestMailboxRoutingLogic
BroadcastRoutingLogic
You would typically use this in a regular actor. The actor in which you use the RoutingLogic
would be the router. If you go down this path, you would be responsible for managing the routers
children, i.e., the routees. That means you would be responsible for managing ALL aspects of the routees, including adding them to a list of available routees, watching them for Termination to remove them from the list of available routees (which sounds a lot like supervision, doesn’t it).
Here is what a skeleton for an actor that is setup manually as a router may look like:
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{Actor, Props, Terminated}
import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}
class RouterActor(val routingLogic : RoutingLogic) extends Actor {
val counter : AtomicInteger = new AtomicInteger()
val routees = Vector.fill(5) {
val workerCount = counter.getAndIncrement()
val r = context.actorOf(Props(
new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
context watch r
ActorRefRoutee(r)
}
var router = Router(routingLogic, routees)
def receive = {
case WorkMessage =>
router.route(WorkMessage, sender())
case Report => routees.foreach(ref => ref.send(Report, sender()))
case Terminated(a) =>
router = router.removeRoutee(a)
val workerCount = counter.getAndIncrement()
val r = context.actorOf(Props(
new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
context watch r
router = router.addRoutee(r)
}
}
It can be seen that I pass in the RoutingLogic
, which would be one of the available RoutingLogic
strategies that akka comes with.
The other thing to note is that as we stated earlier we need to FULLY manage the collection of routee actors ourselves, including watching them for Termination.
Sure there is a better way?
Well yes, thankfully there is, Akka also provides a Pool for this job. We will look at that next.
Pool
Akka comes with the ability to create a router using a pool where we tell it what actors we want to use as the routees, how many routees we want, and how the supervision should be handled.
Here is some code from demo code that uses 2 utility methods to create a pool created router that will use a simple FibboniciActor
which is sent messages via an actor that is created using the pool router value.
def RunTailChoppingPoolDemo() : Unit = {
val supervisionStrategy = OneForOneStrategy() {
case e => SupervisorStrategy.restart
}
val props = TailChoppingPool(5, within = 10.seconds,
supervisorStrategy = supervisionStrategy,interval = 20.millis).
props(Props[FibonacciActor])
RunPoolDemo(props)
}
def RunPoolDemo(props : Props) : Unit = {
val system = ActorSystem("RoutingSystem")
val actorRef = system.actorOf(Props(
new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
actorRef ! WorkMessage
StdIn.readLine()
system.terminate()
}
import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.ask
class PoolRouterContainerActor(val props: Props, val name :String) extends Actor {
val router: ActorRef = context.actorOf(props, name)
def receive = {
case WorkMessage =>
implicit val timeout = Timeout(5 seconds)
val futureResult = router ? FibonacciNumber(10)
val (actName,result) = Await.result(futureResult, timeout.duration)
println(s"FibonacciActor : ($actName) came back with result -> $result")
}
}
import akka.actor.Actor
import scala.annotation.tailrec
class FibonacciActor extends Actor {
val actName = self.path.name
def receive = {
case FibonacciNumber(nbr) => {
println(s"FibonacciActor : ($actName) -> " +
s"has been asked to calculate FibonacciNumber")
val result = fibonacci(nbr)
sender ! (actName,result)
}
}
private def fibonacci(n: Int): Int = {
@tailrec
def fib(n: Int, b: Int, a: Int): Int = n match {
case 0 => a
case _ => fib(n - 1, a + b, b)
}
fib(n, 1, 0)
}
}
Supervision Using Pool
Routees that are created by a pool router will be created as the router’s children. The router is therefore also the children’s supervisor.
The supervision strategy of the router actor can be configured with the supervisorStrategy property of the Pool. If no configuration is provided, routers default to a strategy of “always escalate”. This means that errors are passed up to the router’s supervisor for handling. The router’s supervisor will decide what to do about any errors.
Note the router’s supervisor will treat the error as an error with the router itself. Therefore a directive to stop or restart will cause the router itself to stop or restart. The router, in turn, will cause its children to stop and restart.
It should be mentioned that the router’s restart behavior has been overridden so that a restart, while still re-creating the children, will still preserve the same number of actors in the pool.
This means that if you have not specified supervisorStrategy of the router or its parent, a failure in a routee will escalate to the parent of the router, which will by default restart the router, which will restart all routees (it uses Escalate and does not stop routees during restart). The reason is to make the default behave such that adding withRouter to a child’s definition does not change the supervision strategy applied to the child. This might be an inefficiency that you can avoid by specifying the strategy when defining the router.
Group
You may also wish to create your routees separately and let the router know about them. This is achievable using Groups. This is not something I decided to cover in this post, but if this sounds of interest to you, you can read more about it at the official documentation here:
Routing Strategy Demos
For the demos, I am using a mixture of RoutingLogic
hosted in my own actor, and also Pool based routers.
Here is the basic setup for a RoutingLogic
based actor of my own, where I have to manage all supervision concerns manually.
There are ALWAYS 5 routees involved with this demo.
import java.util.concurrent.TimeUnit
import akka.actor._
import akka.routing._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn
object Demo extends App {
RunRoutingDemo(RoundRobinRoutingLogic())
def RunRoutingDemo(routingLogic : RoutingLogic) : Unit = {
val system = ActorSystem("RoutingSystem")
val actorRef = system.actorOf(Props(
new RouterActor(routingLogic)), name = "theRouter")
for (i <- 0 until 10) {
actorRef ! WorkMessage
Thread.sleep(1000)
}
actorRef ! Report
StdIn.readLine()
system.terminate()
}
}
Where we make use of the following generic actor code that uses the specific RoutingLogic
that is passed in.
import java.util.concurrent.atomic.AtomicInteger
import akka.actor.{Actor, Props, Terminated}
import akka.routing.{RoutingLogic, ActorRefRoutee, RoundRobinRoutingLogic, Router}
class RouterActor(val routingLogic : RoutingLogic) extends Actor {
val counter : AtomicInteger = new AtomicInteger()
val routees = Vector.fill(5) {
val workerCount = counter.getAndIncrement()
val r = context.actorOf(Props(
new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
context watch r
ActorRefRoutee(r)
}
var router = Router(routingLogic, routees)
def receive = {
case WorkMessage =>
router.route(WorkMessage, sender())
case Report => routees.foreach(ref => ref.send(Report, sender()))
case Terminated(a) =>
router = router.removeRoutee(a)
val workerCount = counter.getAndIncrement()
val r = context.actorOf(Props(
new WorkerActor(workerCount)), name = s"workerActor-$workerCount")
context watch r
router = router.addRoutee(r)
}
}
This is what the routees look like for this set of demos:
import akka.actor.Actor
class WorkerActor(val id : Int) extends Actor {
var msgCount = 0
val actName = self.path.name
def receive = {
case WorkMessage => {
msgCount += 1
println(s"worker : {$id}, name : ($actName) -> ($msgCount)")
}
case Report => {
println(s"worker : {$id}, name : ($actName) -> saw total messages : ($msgCount)")
}
case _ => println("unknown message")
}
}
Ok, so let's have a look at some examples of using this code, shall we:
RoundRobin
We get this output, where each routee gets the message round robin strategy applied:
worker : {0}, name : (workerActor-0) -> (1)
worker : {1}, name : (workerActor-1) -> (1)
worker : {2}, name : (workerActor-2) -> (1)
worker : {3}, name : (workerActor-3) -> (1)
worker : {4}, name : (workerActor-4) -> (1)
worker : {0}, name : (workerActor-0) -> (2)
worker : {1}, name : (workerActor-1) -> (2)
worker : {2}, name : (workerActor-2) -> (2)
worker : {3}, name : (workerActor-3) -> (2)
worker : {4}, name : (workerActor-4) -> (2)
worker : {0}, name : (workerActor-0) -> saw total messages : (2)
worker : {1}, name : (workerActor-1) -> saw total messages : (2)
worker : {2}, name : (workerActor-2) -> saw total messages : (2)
worker : {4}, name : (workerActor-4) -> saw total messages : (2)
worker : {3}, name : (workerActor-3) -> saw total messages : (2)
Random
We get this output, where the messages are sent to routees randomly:
worker : {1}, name : (workerActor-1) -> (1)
worker : {1}, name : (workerActor-1) -> (2)
worker : {4}, name : (workerActor-4) -> (1)
worker : {0}, name : (workerActor-0) -> (1)
worker : {0}, name : (workerActor-0) -> (2)
worker : {2}, name : (workerActor-2) -> (1)
worker : {3}, name : (workerActor-3) -> (1)
worker : {4}, name : (workerActor-4) -> (2)
worker : {0}, name : (workerActor-0) -> (3)
worker : {0}, name : (workerActor-0) -> (4)
worker : {1}, name : (workerActor-1) -> saw total messages : (2)
worker : {0}, name : (workerActor-0) -> saw total messages : (4)
worker : {2}, name : (workerActor-2) -> saw total messages : (1)
worker : {4}, name : (workerActor-4) -> saw total messages : (2)
worker : {3}, name : (workerActor-3) -> saw total messages : (1)
SmallestMailBox
We get this output, where the routee with the smallest mailbox will get the message sent to it. This example may look a bit weird, but if you think about it, by the time the new message is sent the 1st routee (workerActor0
) will have dealt with the 1st message, and it is ready to receive a new one, and since it’s the 1st routee in the list, it is still considered the one with the smallest mailbox. If you introduced an artificial delay in the actor dealing with the message, it may show different more interesting results.
worker : {0}, name : (workerActor-0) -> (1)
worker : {0}, name : (workerActor-0) -> (2)
worker : {0}, name : (workerActor-0) -> (3)
worker : {0}, name : (workerActor-0) -> (4)
worker : {0}, name : (workerActor-0) -> (5)
worker : {0}, name : (workerActor-0) -> (6)
worker : {0}, name : (workerActor-0) -> (7)
worker : {0}, name : (workerActor-0) -> (8)
worker : {0}, name : (workerActor-0) -> (9)
worker : {0}, name : (workerActor-0) -> (10)
worker : {2}, name : (workerActor-2) -> saw total messages : (0)
worker : {4}, name : (workerActor-4) -> saw total messages : (0)
worker : {1}, name : (workerActor-1) -> saw total messages : (0)
worker : {0}, name : (workerActor-0) -> saw total messages : (10)
worker : {3}, name : (workerActor-3) -> saw total messages : (0)
Broadcast
We get this output, where each routee should see ALL messages:
worker : {0}, name : (workerActor-0) -> (1)
worker : {2}, name : (workerActor-2) -> (1)
worker : {4}, name : (workerActor-4) -> (1)
worker : {3}, name : (workerActor-3) -> (1)
worker : {1}, name : (workerActor-1) -> (1)
worker : {0}, name : (workerActor-0) -> (2)
worker : {1}, name : (workerActor-1) -> (2)
worker : {4}, name : (workerActor-4) -> (2)
worker : {2}, name : (workerActor-2) -> (2)
worker : {3}, name : (workerActor-3) -> (2)
worker : {0}, name : (workerActor-0) -> (3)
worker : {2}, name : (workerActor-2) -> (3)
worker : {3}, name : (workerActor-3) -> (3)
worker : {4}, name : (workerActor-4) -> (3)
worker : {1}, name : (workerActor-1) -> (3)
worker : {1}, name : (workerActor-1) -> (4)
worker : {4}, name : (workerActor-4) -> (4)
worker : {3}, name : (workerActor-3) -> (4)
worker : {0}, name : (workerActor-0) -> (4)
worker : {2}, name : (workerActor-2) -> (4)
worker : {0}, name : (workerActor-0) -> (5)
worker : {1}, name : (workerActor-1) -> (5)
worker : {4}, name : (workerActor-4) -> (5)
worker : {2}, name : (workerActor-2) -> (5)
worker : {3}, name : (workerActor-3) -> (5)
worker : {3}, name : (workerActor-3) -> (6)
worker : {2}, name : (workerActor-2) -> (6)
worker : {1}, name : (workerActor-1) -> (6)
worker : {4}, name : (workerActor-4) -> (6)
worker : {0}, name : (workerActor-0) -> (6)
worker : {1}, name : (workerActor-1) -> (7)
worker : {0}, name : (workerActor-0) -> (7)
worker : {4}, name : (workerActor-4) -> (7)
worker : {2}, name : (workerActor-2) -> (7)
worker : {3}, name : (workerActor-3) -> (7)
worker : {0}, name : (workerActor-0) -> (8)
worker : {3}, name : (workerActor-3) -> (8)
worker : {1}, name : (workerActor-1) -> (8)
worker : {2}, name : (workerActor-2) -> (8)
worker : {4}, name : (workerActor-4) -> (8)
worker : {2}, name : (workerActor-2) -> (9)
worker : {3}, name : (workerActor-3) -> (9)
worker : {4}, name : (workerActor-4) -> (9)
worker : {1}, name : (workerActor-1) -> (9)
worker : {0}, name : (workerActor-0) -> (9)
worker : {0}, name : (workerActor-0) -> (10)
worker : {2}, name : (workerActor-2) -> (10)
worker : {1}, name : (workerActor-1) -> (10)
worker : {4}, name : (workerActor-4) -> (10)
worker : {3}, name : (workerActor-3) -> (10)
worker : {1}, name : (workerActor-1) -> saw total messages : (10)
worker : {2}, name : (workerActor-2) -> saw total messages : (10)
worker : {0}, name : (workerActor-0) -> saw total messages : (10)
worker : {3}, name : (workerActor-3) -> saw total messages : (10)
worker : {4}, name : (workerActor-4) -> saw total messages : (10)
So that about covers the demos I have created for using your own actor and using the RoutingLogic
. Let's now look at using pools, as I have stated already pools take care of supervision for us, so we don’t have to manually take care of that any more.
As before, I have a helper actor to work with the pool, that accepts the router, where the router will receive the messages to send to its routees.
Here is the demo code:
import java.util.concurrent.TimeUnit
import akka.actor._
import akka.routing._
import scala.concurrent.duration.FiniteDuration
import scala.concurrent.duration._
import scala.language.postfixOps
import scala.io.StdIn
object Demo extends App {
RunScatterGatherFirstCompletedPoolDemo()
def RunScatterGatherFirstCompletedPoolDemo() : Unit = {
val supervisionStrategy = OneForOneStrategy() {
case e => SupervisorStrategy.restart
}
val props = ScatterGatherFirstCompletedPool(
5, supervisorStrategy = supervisionStrategy,within = 10.seconds).
props(Props[FibonacciActor])
RunPoolDemo(props)
}
def RunTailChoppingPoolDemo() : Unit = {
val supervisionStrategy = OneForOneStrategy() {
case e => SupervisorStrategy.restart
}
val props = TailChoppingPool(5, within = 10.seconds,
supervisorStrategy = supervisionStrategy,interval = 20.millis).
props(Props[FibonacciActor])
RunPoolDemo(props)
}
def RunPoolDemo(props : Props) : Unit = {
val system = ActorSystem("RoutingSystem")
val actorRef = system.actorOf(Props(
new PoolRouterContainerActor(props,"theRouter")), name = "thePoolContainer")
actorRef ! WorkMessage
StdIn.readLine()
system.terminate()
}
}
And here is the help actor:
import akka.actor._
import akka.util.Timeout
import scala.concurrent.Await
import scala.concurrent.duration._
import akka.pattern.ask
class PoolRouterContainerActor(val props: Props, val name :String) extends Actor {
val router: ActorRef = context.actorOf(props, name)
def receive = {
case WorkMessage =>
implicit val timeout = Timeout(5 seconds)
val futureResult = router ? FibonacciNumber(10)
val (actName,result) = Await.result(futureResult, timeout.duration)
println(s"FibonacciActor : ($actName) came back with result -> $result")
}
}
As before we will use 5 routees.
This is what the routees look like for the pool demo:
import akka.actor.Actor
import scala.annotation.tailrec
class FibonacciActor extends Actor {
val actName = self.path.name
def receive = {
case FibonacciNumber(nbr) => {
println(s"FibonacciActor : ($actName) -> " +
s"has been asked to calculate FibonacciNumber")
val result = fibonacci(nbr)
sender ! (actName,result)
}
}
private def fibonacci(n: Int): Int = {
@tailrec
def fib(n: Int, b: Int, a: Int): Int = n match {
case 0 => a
case _ => fib(n - 1, a + b, b)
}
fib(n, 1, 0)
}
}
ScatterGatherFirstCompletedPool
Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first.
FibonacciActor : ($d) -> has been asked to calculate FibonacciNumber
FibonacciActor : ($e) -> has been asked to calculate FibonacciNumber
FibonacciActor : ($a) -> has been asked to calculate FibonacciNumber
FibonacciActor : ($c) -> has been asked to calculate FibonacciNumber
FibonacciActor : ($b) -> has been asked to calculate FibonacciNumber
FibonacciActor : ($d) came back with result -> 55
TailChoppingPool
Here is the output when we run this. It can be seen that we simply get the results from the routee that completed first, out of the few routees that the message was sent to:
FibonacciActor : ($b) -> has been asked to calculate FibonacciNumber
FibonacciActor : ($b) came back with result -> 55
What About Custom Routing Strategy
Akka allows you to create your own routing strategy where you would create a class that extends the inbuilt Akka RoutingLogic
. You can read more about this in the official Akka documentation:
Where Can I Find the Code Examples?
I will be augmenting this GitHub repo with the example projects as I move through this series: