As with most things in Akka you are able to configure things either in code (by way of overriding things) or by configuration. I personally prefer configuration. So in order to get the persistence to work, one must provide the following configuration
Ok so getting back on track. How do we create a persistence actor in Akka.
I personally find the fact that there is an expectation of a command and that we end up mixing in a trait that uses the name “EventSourced
” to be quite a strong indicator of just how similar working with Akka persistence is to CQRS + traditional event sourcing ideas.
If we were to go further down the rabbit hole and keep looking into the base trait EventSourced
we would see a couple more abstract methods that are of interest that are expected to be supplied by the end users code:
As stated these methods are abstract methods that YOUR code would need to supply to make the persistence actor stuff work properly. We will see more of this in a bit
One of the rules you must follow when using persistent actors is that the persistent actor MUST have the same ID, even including across incarnations. This can be set using the persistenceId method as shown below
As I stated earlier snapshots can reduce the amount of extra events that need to replayed against the event source target (the persistent actor).
Special care must be taken when shutting down persistent actors from outside. For non persistence actor a PoisonPill may be used. This is not recommended for persistence actors due to how the commands are stashed until such a time as the journaling mechanism signals that things are stored. At which time the mailbox is drained. A better way is to use explicit shutdown messages
So we have talked about event/commands/CQRS/event sourcing/snapshots, but so far we have not talked about how to actually save state. How is this done?
Well as luck would have it, its very easy we simply call the persist method, which looks like this
Ok so have now gone through most of the bits you would need to work with persistent actors. Time for a demo
Lets assume we have the following commands that will send to a single persistent actor
case class Evt(data: String)
Where we would hold this type of state within the persistent actor
case class ExampleState(events: List[String] = Nil) {
def updated(evt: Evt): ExampleState = copy(evt.data :: events)
def size: Int = events.length
override def toString: String = events.reverse.toString
}
And that the actual persistent actor looks like this
import akka.actor._
import akka.persistence._
class DemoPersistentActor extends PersistentActor {
var state = ExampleState()
def updateState(event: Evt): Unit =
state = state.updated(event)
def numEvents =
state.size
val receiveRecover: Receive = {
case evt: Evt => updateState(evt)
case SnapshotOffer(_, snapshot: ExampleState) => {
println(s"offered state = $snapshot")
state = snapshot
}
}
val receiveCommand: Receive = {
case Cmd(data) =>
persist(Evt(s"${data}-${numEvents}"))(updateState)
persist(Evt(s"${data}-${numEvents + 1}")) { event =>
updateState(event)
context.system.eventStream.publish(event)
}
case "snap" => saveSnapshot(state)
case SaveSnapshotSuccess(metadata) =>
println(s"SaveSnapshotSuccess(metadata) : metadata=$metadata")
case SaveSnapshotFailure(metadata, reason) =>
println("""SaveSnapshotFailure(metadata, reason) :
metadata=$metadata, reason=$reason""")
case "print" => println(state)
case "boom" => throw new Exception("boom")
}
override def persistenceId = "demo-persistent-actor-1"
}
Which we could first run using this demo code:
import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
object Demo extends App {
val system = ActorSystem("PersitenceSystem")
val persistentActor =
system.actorOf(Props(classOf[DemoPersistentActor]),
"demo-persistent-actor-1")
persistentActor ! "print"
persistentActor ! Cmd("foo")
persistentActor ! Cmd("baz")
persistentActor ! "boom"
persistentActor ! Cmd("bar")
persistentActor ! "snap"
persistentActor ! Cmd("buzz")
persistentActor ! "print"
StdIn.readLine()
system.terminate()
StdIn.readLine()
}
Which gives the following output (your output may look a little different to mine as I have run this code a number of times so have previous runs state on disk)
offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8,
foo-9, baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23)
[ERROR] [08/16/2016 18:43:21.475] [PersitenceSystem-akka.actor.default-dispatcher-5]
[akka://PersitenceSystem/user/demo-persistent-actor-1] boom
java.lang.Exception: boom
at DemoPersistentActor$$anonfun$2.applyOrElse(DemoPersistentActor.scala:39)
at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
at DemoPersistentActor.akka$persistence$Eventsourced$$super$aroundReceive(DemoPersistentActor.scala:6)
at akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:657)
at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:182)
at DemoPersistentActor.aroundReceive(DemoPersistentActor.scala:6)
at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
at akka.actor.ActorCell.invoke(ActorCell.scala:495)
at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
at akka.dispatch.Mailbox.run(Mailbox.scala:224)
at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9,
baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
[WARN] [08/16/2016 18:43:21.494] [PersitenceSystem-akka.persistence.dispatchers.default-stream-dispatcher-8]
[akka.serialization.Serialization(akka://PersitenceSystem)] Using the default Java serializer for class
[ExampleState] which is not recommended because of performance implications. Use another serializer or
disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23, foo-24, foo-25,
baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)
SaveSnapshotSuccess(metadata) : metadata=SnapshotMetadata(demo-persistent-actor-1,33,1471369401491)
And then if we were to run this demo code:
import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
object Demo extends App {
val system = ActorSystem("PersitenceSystem")
val persistentActor =
system.actorOf(Props(classOf[DemoPersistentActor]),
"demo-persistent-actor-1")
persistentActor ! "print"
StdIn.readLine()
system.terminate()
StdIn.readLine()
}
Lets see what we can see:
offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7,
foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23,
foo-24, foo-25, baz-26, baz-27, bar-28, bar-29)
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11,
bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22,
buzz-23, foo-24, foo-25, baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)
It can be seen that although we did not save any new events the state of the demo persistent actor was restored correctly using a combination of a snapshot and events that are newer than the snapshot
Where Is The Code?
As previously stated all the code for this series will end up in this GitHub repo:
https://github.com/sachabarber/SachaBarber.AkkaExamples