Last time we looked at Akka Http, this time we will look at Akka Streams.
Akka Streams is a vast topic, and you will definitely need to supplement this post with the official documentation.
Akka Streams is one of the founding members of Reactive Streams, and Akka streams is one implementation (there are many) of the Reactive Streams APIs.
Reactive Streams is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.
Introduction
There may be some readers who have come from .NET such as myself who have used RX.
You may even have heard of Reactive Streams before. So what exactly makes reactive streams different from Rx?
The central thing that is the big win with reactive streams over Rx is the idea of back pressure. Here is what the Akka docs say about back pressure
The back pressure protocol is defined in terms of the number of elements a downstream Subscriber is able to receive and buffer, referred to as demand. The source of data, referred to as Publisher in Reactive Streams terminology and implemented as Source in Akka Streams, guarantees that it will never emit more elements than the received total demand for any given Subscriber.
http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html#back-pressure-explained
Luckily this is all inbuilt to Akka streams, you do not have to worry about this too much as a user of Akka streams.
You can pretty much decide how you want the built in streams pipelines (which we will be diving into in more details below) in terms of backpressure using the OverflowStrategy
enum value. Here is a very simple example
Source(1 to 10).buffer(10, OverflowStrategy.backpressure)
Where the following are the available OverflowStrategy
values
object OverflowStrategy {
def dropHead: OverflowStrategy = DropHead
def dropTail: OverflowStrategy = DropTail
def dropBuffer: OverflowStrategy = DropBuffer
def dropNew: OverflowStrategy = DropNew
def backpressure: OverflowStrategy = Backpressure
def fail: OverflowStrategy = Fail
}
So that is the basic idea, Akka streams does provide a lot of stuff, such as
- Built in stages/shapes
- A graph API
- Ability to create your own stages/shapes
For the rest of this post we will be looking at some examples of these 3 points.
Working With The Akka Streams APIs
As stated at the beginning of this post the Akka Streams implementation is vast. There is a lot of ground to cover, far more than I can reasonably cover in a small blog post. The official docs are still the place to go, but if you have not heard of Akka Streams this post may be enough to get you into it.
The official docs (at time of writing) are here:
http://doc.akka.io/docs/akka/2.4.2/scala/stream/index.html
Working With Built In Stages/Shapes
Akka comes with loads of prebuilt stages which we can make use of. However before I mention those lets try and just spend a bit of time taking a bit about how you use the Akka Streams APIs in their most basic form.
The idea is that we have 4 different parts that make up a useable pipeline.
Source
A processing stage with exactly one output, emitting data elements whenever downstream processing stages are ready to receive them.
Sink
A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements
Flow
A processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.
RunnableGraph
A Flow that has both ends “attached” to a Source and Sink respectively, and is ready to be run()
.
As I say Akka comes with loads of inbuilt stages to make our lives easier here. For example these are the available stages at time of writing
Source Stages
- fromIterator
- apply
- single
- repeat
- tick
- fromFuture
- fromCompletionStage
- unfold
- unfoldAsync
- empty
- maybe
- failed
- actorPublisher
- actorRef
- combine
- queue
- asSubscriber
- fromPublisher
- fromFile
Sink Stages
- head
- headOption
- last
- lastOption
- ignore
- cancelled
- seq
- foreach
- foreachParallel
- onComplete
- fold
- reduce
- combine
- actorRef
- actorRefWithAck
- actorSubscriber
asPublisher - fromSubscriber
- toFile
We will now look at some example of using some of these
def simpleFlow() : Unit = {
val source = Source(1 to 10)
val sink = Sink.fold[Int, Int](0)(_ + _)
val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
implicit val timeout = Timeout(5 seconds)
val sumFuture: Future[Int] = runnable.run()
val sum = Await.result(sumFuture, timeout.duration)
println(s"source.toMat(sink)(Keep.right) Sum = $sum")
val sumFuture2: Future[Int] = source.runWith(sink)
val sum2 = Await.result(sumFuture2, timeout.duration)
println(s"source.runWith(sink) Sum = $sum")
}
In this simple example we have s Source(1 to 10) which we then wire up to a Sink which adds the numbers coming in.
This block demonstrates various different Source(s) and Sink(s)
def differentSourcesAndSinks() : Unit = {
Source(List(1, 2, 3)).runWith(Sink.foreach(println))
Source.single("only one element").runWith(Sink.foreach(println))
val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
Source(List("hello", "hello"))
.runWith(Sink.actorRef(helloActor,DoneMessage))
val futureString = Source.fromFuture(Future.successful("Hello Streams!"))
.toMat(Sink.head)(Keep.right).run()
implicit val timeout = Timeout(5 seconds)
val theString = Await.result(futureString, timeout.duration)
println(s"theString = $theString")
}
And this block demos using a simple Map on a Source
def mapFlow() : Unit = {
val source = Source(11 to 16)
val doublerSource = source.map(x => x * 2)
val sink = Sink.foreach(println)
implicit val timeout = Timeout(5 seconds)
val printSinkFuture: Future[Done] = doublerSource.runWith(sink)
Await.result(printSinkFuture, timeout.duration)
}
Working With The Graph API
Akka streams also comes with a pretty funky graph building DSL. You would use this when you want to create quite elaborate flows.
The other very interesting thing about the graph builder DSL is that you can use custom shapes inside it, and you can also leave it partially connected. Such that you could potentially use it as a Source/Sink.
Lets say you had an output from the graph you built using the graph DSL, you could then use that partially constructed graph as a Source in its own right.
The same goes if you had an unconnected input in the graph you created you could use that as a Sink.
You can read more about this here :
http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-graphs.html#constructing-sources-sinks-and-flows-from-partial-graphs
I urge you all to have a read of that as its quite cool what can be done with the graph DSL
Ok so time for an example, this example comes directly from the TypeSafe activator code
http://www.lightbend.com/activator/template/akka-stream-scala
package com.sas.graphs
import java.io.File
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.util.ByteString
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Failure, Success }
class WritePrimesDemo {
def run(): Unit = {
implicit val system = ActorSystem("Sys")
import system.dispatcher
implicit val materializer = ActorMaterializer()
val maxRandomNumberSize = 1000000
val primeSource: Source[Int, NotUsed] =
Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize))).
filter(rnd => isPrime(rnd)).
filter(prime => isPrime(prime + 2))
val fileSink = FileIO.toPath(new File("target/primes.txt").toPath)
val slowSink = Flow[Int]
.map(i => { Thread.sleep(1000); ByteString(i.toString) })
.toMat(fileSink)((_, bytesWritten) => bytesWritten)
val consoleSink = Sink.foreach[Int](println)
val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
(slow, console) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
primeSource ~> broadcast ~> slow
broadcast ~> console
ClosedShape
}
val materialized = RunnableGraph.fromGraph(graph).run()
materialized.onComplete {
case Success(_) =>
system.terminate()
case Failure(e) =>
println(s"Failure: ${e.getMessage}")
system.terminate()
}
}
def isPrime(n: Int): Boolean = {
if (n <= 1) false
else if (n == 2) true
else !(2 to (n - 1)).exists(x => n % x == 0)
}
}
The most important part of this code is this part
val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
(slow, console) =>
import GraphDSL.Implicits._
val broadcast = builder.add(Broadcast[Int](2))
primeSource ~> broadcast ~> slow
broadcast ~> console
ClosedShape
}
val materialized = RunnableGraph.fromGraph(graph).run()
There is 2 sinks defined before we use the Graph
- A file Sink
- A console Sink
There is also a Source that generates random primes
So the Graph DSL allows you to um well create graphs. It allows you to take in inputs and create other shapes using the implicit builder that is provided.
The DSL then allows you to connect inputs/other builder creates stages/shapes to the inputs and even expose the connected stages to an output.
This is done using the ~> syntax than simply means connect
As previously stated you can create partially connected graphs, but if you have all inputs and outputs connected it is considered a ClosedShape, that can be used as an isolated component
Here is an example of the output of running this graph example
Create Custom Shapes/Stages
It doesn’t stop there, we can also create out own shapes that can be used in flows. This is a pretty complex subject and you will definitely benefit from reading this page
http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html
There is no way this little post will cover enough, but here are some highlights of the official documentation
This is the basic pattern you would use to create a custom stage
import akka.stream.SourceShape
import akka.stream.stage.GraphStage
class NumbersSource extends GraphStage[SourceShape[Int]] {
val out: Outlet[Int] = Outlet("NumbersSource")
override val shape: SourceShape[Int] = SourceShape(out)
override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
}
Most of the actual logic will be inside the createLogic
method. But in order to do anything useful in there you will need to use handlers. Handlers are what you use to handle input/output. There are InHandler
and OutHandler
.
Each of which has its own state machine flow. For example this is the state machine for an OutHandler
Whilst this is the one for InHandler
This is the best page to read to learn more about these handlers
http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler
The one and ONLY place that state should be maintained is within the createLogic
method.
Lets consider a small example. Lets say we have some objects like this
case class Element(id: Int, value: Int)
And we want to build a custom stage that will allow us to select a value from this type, and should only emit an output value for unique values as provided by the property selector.
We could call this DistinctUntilChanged
. Lets see what an example for this could look like
package com.sas.customshapes
import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler, GraphStage}
import akka.stream.{Outlet, Attributes, Inlet, FlowShape}
import scala.collection.immutable
final class DistinctUntilChanged[E, P](propertyExtractor: E => P)
extends GraphStage[FlowShape[E, E]] {
val in = Inlet[E]("DistinctUntilChanged.in")
val out = Outlet[E]("DistinctUntilChanged.out")
override def shape = FlowShape.of(in, out)
override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {
private var savedState : Option[E] = None
setHandlers(in, out, new InHandler with OutHandler {
override def onPush(): Unit = {
val nextElement = grab(in)
val nextState = propertyExtractor(nextElement)
if (savedState.isEmpty || propertyExtractor(savedState.get) != nextState) {
savedState = Some(nextElement)
push(out, savedState.get)
}
else {
pull(in)
}
savedState = Some(nextElement)
}
override def onPull(): Unit = {
pull(in)
}
override def onUpstreamFinish(): Unit = {
completeStage()
}
})
override def postStop(): Unit = {
savedState = None
}
}
}
The highlights of this are
- We have a single Inlet
- We have a single Outlet
- We expose a FlowShape (in/out only) there are many shapes but FlowShape is what we want for one in/out out
- We use
createLogic
to do the work - We use an
InHandler
to handle input - We use an
OutHandler
to handle output
One other important thing (at least for this single in/out example) is that we DO NOT call pull/push more than once in the createLogic
Lets assume we have these elements
package com.sas.customshapes
import scala.collection.immutable
object SampleElements {
val E11 = Element(1, 1)
val E21 = Element(2, 1)
val E31 = Element(3, 1)
val E42 = Element(4, 2)
val E52 = Element(5, 2)
val E63 = Element(6, 3)
val Ones = immutable.Seq(E11, E21, E31)
val Twos = immutable.Seq(E42, E52)
val Threes = immutable.Seq(E63)
val All = Ones ++ Twos ++ Threes
}
And this demo code
def runDistinctUntilChanged() : Unit = {
Source(SampleElements.All)
.via(new DistinctUntilChanged(_.value))
.runWith(Sink.foreach(println))
}
We would get this output to the Sink
This example does owe a lot to a nice blog post I found here :
https://www.softwaremill.com/implementing-a-custom-akka-streams-graph-stage/
That’s It
Anyway that is the end of the series I hope you have enjoyed it, and have learnt you some Akka along the way
I am going to have a small break now and then start looking into some Azure/Web stuff I think
Where Can I Find The Code Examples?
I will be augmenting this GitHub repo with the example projects as I move through this series
https://github.com/sachabarber/SachaBarber.AkkaExamples