Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

akka : persistent actors

4.88/5 (4 votes)
16 Aug 2016CPOL7 min read 8K  
In this post we will look at some of the core concepts around persistent actors.

In this post we will look at some of the core concepts around persistent actors. Which may seem strange since so far I have been saying that Akka is great cos it is lock free and each actor is stateless.

Truth is there are times where some sort immutable state, just can’t be avoided, and for that Akka provides us with the PersistentActor

Event Sourcing

Akka persistence does borrow a couple of ideas from other concepts out there in the wild, such as

  • Event sourcing
  • Possibly CQRS
  • Snapshots

if you have not heard of event sourcing before, it is fairly simple to explain. The idea is that you have a data structure that is able to receive events to set its internal state. At the same time you also have an event store that stores events in reverse chronological order.

These events are then played onto the object that accepts them, whereby the object will build its own internal state from the events played on to it.

The eagle eyed amongst you may be thinking won’t that lead to loads of events? Well yes it might.

There is however an optimization around this called “snapshots”. snapshots are a special type of event where they capture the object state as it is right now.

So in reality you would apply the latest snapshot, and then take any events that occurred after that and apply them, which would drastically reduce the amount of event play back that would need to occur against an object that would like to receive events from the event store.

I have written a fairly well received article about this before for the .NET space which also included working code and also includes the CQRS (Command Query Responsibility Segregation) part of it too. If you would like to know more about that you can read about it here:

http://www.codeproject.com/Articles/991648/CQRS-A-Cross-Examination-Of-How-It-Works

Anyway I am kind of drifting of topic a bit here, the point is that Akka persistence borrows some of these ideas from other concepts/frameworks, so it may be of some use to have a read around some of it, in particular event sourcing and CQRS.

Dependencies

In order to work with Akka persistence you will need the following 2 SBT entries in your build.sbt file

Scala
"com.typesafe.akka" %% "akka-actor" % "2.4.8",
"com.typesafe.akka" %% "akka-persistence" % "2.4.8

Storage

Akka persistence comes with a couple of pre-canned persistence storage mechanisms, and there are many more community based storage frameworks that you can use.

Within this article I have chosen to use the Akka provided storage mechanism which is LevelDB.

In order to do this, you will need the following SBT entries in your build.sbt file

Scala
"org.iq80.leveldb"            % "leveldb"          % "0.7",
"org.fusesource.leveldbjni"   % "leveldbjni-all"   % "1.8"    

You can read more about storage in general at the official Akka docs page

http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Storage_plugins

Config

As with most things in Akka you are able to configure things either in code (by way of overriding things) or by configuration. I personally prefer configuration. So in order to get the persistence to work, one must provide the following configuration

Scala
akka.persistence.journal.plugin = "akka.persistence.journal.leveldb"
akka.persistence.snapshot-store.plugin = "akka.persistence.snapshot-store.local"
 
akka.persistence.journal.leveldb.dir = "target/example/journal"
akka.persistence.snapshot-store.local.dir = "target/example/snapshots"
 
# DO NOT USE THIS IN PRODUCTION !!!
# See also https://github.com/typesafehub/activator/issues/287
akka.persistence.journal.leveldb.native = false    

For me this is in an Resources/application.conf file

PersistentActor Trait

Ok so getting back on track. How do we create a persistence actor in Akka.

Well out of the box Akka provides the PersistentActor trait, which you can mix in. This trait looks like this

Scala
trait PersistentActor extends Eventsourced with PersistenceIdentity {
  def receive = receiveCommand
}    

There are several things that are of interest here

  • There is an expectation there will be a receiveCommand implementation (this is within the EventSourced trait)
  • That the base trait is called EventSourced is also interesting

I personally find the fact that there is an expectation of a command and that we end up mixing in a trait that uses the name “EventSourced” to be quite a strong indicator of just how similar working with Akka persistence is to CQRS + traditional event sourcing ideas.

If we were to go further down the rabbit hole and keep looking into the base trait EventSourced we would see a couple more abstract methods that are of interest that are expected to be supplied by the end users code:

Scala
/**
 * Recovery handler that receives persisted events during recovery. If a state snapshot
 * has been captured and saved, this handler will receive a [[SnapshotOffer]] message
 * followed by events that are younger than the offered snapshot.
 *
 * This handler must not have side-effects other than changing persistent actor state i.e. it
 * should not perform actions that may fail, such as interacting with external services,
 * for example.
 *
 * If there is a problem with recovering the state of the actor from the journal, the error
 * will be logged and the actor will be stopped.
 *
 * @see [[Recovery]]
 */
def receiveRecover: Receive
 
/**
 * Command handler. Typically validates commands against current state (and/or by
 * communication with other actors). On successful validation, one or more events are
 * derived from a command and these events are then persisted by calling `persist`.
 */
def receiveCommand: Receive    

As stated these methods are abstract methods that YOUR code would need to supply to make the persistence actor stuff work properly. We will see more of this in a bit

Persistent Id

One of the rules you must follow when using persistent actors is that the persistent actor MUST have the same ID, even including across incarnations. This can be set using the persistenceId method as shown below

Scala
override def persistenceId = "demo-persistent-actor-1"

Snapshots

As I stated earlier snapshots can reduce the amount of extra events that need to replayed against the event source target (the persistent actor).

In order to save a snapshot the actor may call the saveSnapshot method.

  • If the snapshot succeeds the actor will receive a SaveSnapshotSuccess message
  • If the snapshot succeeds the actor will receive a SaveSnapshotFailure message
 
Scala
var state: Any = _
  
override def receiveCommand: Receive = {
  case "snap"                                => saveSnapshot(state)
  case SaveSnapshotSuccess(metadata)         => // ...
  case SaveSnapshotFailure(metadata, reason) => // ...
}    

Where the metadata looks like this

Scala
final case class SnapshotMetadata(persistenceId: String, sequenceNr: Long, timestamp: Long = 0L)

During the recovery process the persistent actor is offer a snapshotOffer from which it may restore its internal state.

After the snapshotOffer will come the newer (younger in Akka speak) events, which when replayed onto the persistent actor will get it to its final internal state.

Failure/Recovery

Special care must be taken when shutting down persistent actors from outside. For non persistence actor a PoisonPill may be used. This is not recommended for persistence actors due to how the commands are stashed until such a time as the journaling mechanism signals that things are stored. At which time the mailbox is drained.  A better way is to use explicit shutdown messages

Read more about this here :

http://doc.akka.io/docs/akka/snapshot/scala/persistence.html#Safely_shutting_down_persistent_actors

Persisting state

So we have talked about event/commands/CQRS/event sourcing/snapshots, but so far we have not talked about how to actually save state. How is this done?

Well as luck would have it, its very easy we simply call the persist method, which looks like this

Scala
def persist[A](event: A)(handler: A ⇒ Unit): Unit     

Demo Time

Ok so have now gone through most of the bits you would need to work with persistent actors. Time for a demo

Lets assume we have the following commands that will send to a single persistent actor

Scala
case class Cmd(data: String)    

And this type of event that we would like to store

Scala
case class Evt(data: String)    

Where we would hold this type of state within the persistent actor

Scala
case class ExampleState(events: List[String] = Nil) {
  def updated(evt: Evt): ExampleState = copy(evt.data :: events)
  def size: Int = events.length
  override def toString: String = events.reverse.toString
}    

And that the actual persistent actor looks like this

Scala
import akka.actor._
import akka.persistence._
 
 
 
class DemoPersistentActor extends PersistentActor {
 
  //note : This is  mutable
  var state = ExampleState()
 
  def updateState(event: Evt): Unit =
    state = state.updated(event)
 
  def numEvents =
    state.size
 
  val receiveRecover: Receive = {
    case evt: Evt => updateState(evt)
    case SnapshotOffer(_, snapshot: ExampleState) => {
        println(s"offered state = $snapshot")
        state = snapshot
    }
  }
 
  val receiveCommand: Receive = {
    case Cmd(data) =>
      persist(Evt(s"${data}-${numEvents}"))(updateState)
      persist(Evt(s"${data}-${numEvents + 1}")) { event =>
        updateState(event)
        context.system.eventStream.publish(event)
      }
    case "snap"  => saveSnapshot(state)
    case SaveSnapshotSuccess(metadata) =>
      println(s"SaveSnapshotSuccess(metadata) :  metadata=$metadata")
    case SaveSnapshotFailure(metadata, reason) =>
      println("""SaveSnapshotFailure(metadata, reason) :
        metadata=$metadata, reason=$reason""")
    case "print" => println(state)
    case "boom"  => throw new Exception("boom")
  }
 
  override def persistenceId = "demo-persistent-actor-1"
}

Which we could first run using this demo code:

Scala
import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
 
object Demo extends App {
 
  //create the actor system
  val system = ActorSystem("PersitenceSystem")
 
  val persistentActor =
    system.actorOf(Props(classOf[DemoPersistentActor]),
      "demo-persistent-actor-1")
 
  persistentActor ! "print"
  persistentActor ! Cmd("foo")
  persistentActor ! Cmd("baz")
  persistentActor ! "boom"
  persistentActor ! Cmd("bar")
  persistentActor ! "snap"
  persistentActor ! Cmd("buzz")
  persistentActor ! "print"
 
 
  StdIn.readLine()
 
  //shutdown the actor system
  system.terminate()
 
  StdIn.readLine()
}

Which gives the following output (your output may look a little different to mine as I have run this code a number of times so have previous runs state on disk)

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8,
foo-9, baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23)
[ERROR] [08/16/2016 18:43:21.475] [PersitenceSystem-akka.actor.default-dispatcher-5]
[akka://PersitenceSystem/user/demo-persistent-actor-1] boom
java.lang.Exception: boom
    at DemoPersistentActor$$anonfun$2.applyOrElse(DemoPersistentActor.scala:39)
    at akka.actor.Actor$class.aroundReceive(Actor.scala:484)
    at DemoPersistentActor.akka$persistence$Eventsourced$$super$aroundReceive(DemoPersistentActor.scala:6)
    at akka.persistence.Eventsourced$$anon$1.stateReceive(Eventsourced.scala:657)
    at akka.persistence.Eventsourced$class.aroundReceive(Eventsourced.scala:182)
    at DemoPersistentActor.aroundReceive(DemoPersistentActor.scala:6)
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
    at akka.actor.ActorCell.invoke(ActorCell.scala:495)
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
    at akka.dispatch.Mailbox.run(Mailbox.scala:224)
    at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9,
baz-10, baz-11, bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21)
[WARN] [08/16/2016 18:43:21.494] [PersitenceSystem-akka.persistence.dispatchers.default-stream-dispatcher-8]
[akka.serialization.Serialization(akka://PersitenceSystem)] Using the default Java serializer for class
[ExampleState] which is not recommended because of performance implications. Use another serializer or
disable this warning using the setting ‘akka.actor.warn-about-java-serializer-usage’
List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23, foo-24, foo-25,
baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)
SaveSnapshotSuccess(metadata) :  metadata=SnapshotMetadata(demo-persistent-actor-1,33,1471369401491)

And then if we were to run this demo code:

Scala
import akka.actor._
import scala.language.postfixOps
import scala.io.StdIn
 
object Demo extends App {
 
  //create the actor system
  val system = ActorSystem("PersitenceSystem")
 
  val persistentActor =
    system.actorOf(Props(classOf[DemoPersistentActor]),
      "demo-persistent-actor-1")
 
  persistentActor ! "print"
 
 
  StdIn.readLine()
 
  //shutdown the actor system
  system.terminate()
 
  StdIn.readLine()
}

Lets see what we can see:

offered state = List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7,
foo-8, foo-9, baz-10, baz-11, bar-12,
bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22, buzz-23,
foo-24, foo-25, baz-26, baz-27, bar-28, bar-29)

List(foo-0, foo-1, baz-2, baz-3, bar-4, bar-5, buzz-6, buzz-7, foo-8, foo-9, baz-10, baz-11,
bar-12, bar-13, buzz-14, buzz-15, foo-16, foo-17, baz-18, baz-19, bar-20, bar-21, buzz-22,
buzz-23, foo-24, foo-25, baz-26, baz-27, bar-28, bar-29, buzz-30, buzz-31)

It can be seen that although we did not save any new events the state of the demo persistent actor was restored correctly using a combination of a snapshot and events that are newer than the snapshot

Where Is The Code?

As previously stated all the code for this series will end up in this GitHub repo:

https://github.com/sachabarber/SachaBarber.AkkaExamples

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)