Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / DSL

AKKA STREAMS

5.00/5 (5 votes)
13 Dec 2016CPOL7 min read 7.2K  
Last time we looked at Akka Http, this time we will look at Akka Streams. Akka Streams is a vast topic, and you will definitely need to supplement this  post with the official documentation.

Last time we looked at Akka Http, this time we will look at Akka Streams.

Akka Streams is a vast topic, and you will definitely need to supplement this  post with the official documentation.

Akka Streams is one of the founding members of Reactive Streams, and Akka streams is one implementation (there are many) of the Reactive Streams APIs.

Reactive Streams  is an initiative to provide a standard for asynchronous stream processing with non-blocking back pressure. This encompasses efforts aimed at runtime environments (JVM and JavaScript) as well as network protocols.

Introduction

There may be some readers who have come from .NET such as myself who have used RX.

You may even have heard of Reactive Streams before. So what exactly makes reactive streams different from Rx?

The central thing that is the big win with reactive streams over Rx is the idea of back pressure. Here is what the Akka docs say about back pressure

The back pressure protocol is defined in terms of the number of elements a downstream Subscriber is able to receive and buffer, referred to as demand. The source of data, referred to as Publisher in Reactive Streams terminology and implemented as Source in Akka Streams, guarantees that it will never emit more elements than the received total demand for any given Subscriber.

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-flows-and-basics.html#back-pressure-explained

Luckily this is all inbuilt to Akka streams, you do not have to worry about this too much as a user of Akka streams.

You can pretty much decide how you want the built in streams pipelines (which we will be diving into in more details below) in terms of backpressure using the OverflowStrategy enum value. Here is a very simple example

Source(1 to 10).buffer(10, OverflowStrategy.backpressure)

Where the following are the available OverflowStrategy values

object OverflowStrategy {
  /**
   * If the buffer is full when a new element arrives, drops the oldest element from the buffer to make space for
   * the new element.
   */
  def dropHead: OverflowStrategy = DropHead
 
  /**
   * If the buffer is full when a new element arrives, drops the youngest element from the buffer to make space for
   * the new element.
   */
  def dropTail: OverflowStrategy = DropTail
 
  /**
   * If the buffer is full when a new element arrives, drops all the buffered elements to make space for the new element.
   */
  def dropBuffer: OverflowStrategy = DropBuffer
 
  /**
   * If the buffer is full when a new element arrives, drops the new element.
   */
  def dropNew: OverflowStrategy = DropNew
 
  /**
   * If the buffer is full when a new element is available this strategy backpressures the upstream publisher until
   * space becomes available in the buffer.
   */
  def backpressure: OverflowStrategy = Backpressure
 
  /**
   * If the buffer is full when a new element is available this strategy completes the stream with failure.
   */
  def fail: OverflowStrategy = Fail
}

So that is the basic idea, Akka streams does provide a lot of stuff, such as

  • Built in stages/shapes
  • A graph API
  • Ability to create your own stages/shapes

For the rest of this post we will be looking at some examples of these 3 points.

Working With The Akka Streams APIs

As stated at the beginning of this post the Akka Streams implementation is vast. There is a lot of ground to cover, far more than I can reasonably cover in a small blog post. The official docs are still the place to go, but if you have not heard of Akka Streams this post may be enough to get you into it.

The official docs (at time of writing) are here:

http://doc.akka.io/docs/akka/2.4.2/scala/stream/index.html

Working With Built In Stages/Shapes

Akka comes with loads of prebuilt stages which we can make use of. However before I mention those lets try and just spend a bit of time taking a bit about how you use the Akka Streams APIs in their most basic form.

The idea is that we have 4 different parts that make up a useable pipeline.

Source
A processing stage with exactly one output, emitting data elements whenever downstream processing stages are ready to receive them.

Sink
A processing stage with exactly one input, requesting and accepting data elements possibly slowing down the upstream producer of elements

Flow
A processing stage which has exactly one input and output, which connects its up- and downstreams by transforming the data elements flowing through it.

RunnableGraph
A Flow that has both ends “attached” to a Source and Sink respectively, and is ready to be run().

As I say Akka comes with loads of inbuilt stages to make our lives easier here. For example these are the available stages at time of writing

Source Stages

  • fromIterator
  • apply
  • single
  • repeat
  • tick
  • fromFuture
  • fromCompletionStage
  • unfold
  • unfoldAsync
  • empty
  • maybe
  • failed
  • actorPublisher
  • actorRef
  • combine
  • queue
  • asSubscriber
  • fromPublisher
  • fromFile

Sink Stages

  • head
  • headOption
  • last
  • lastOption
  • ignore
  • cancelled
  • seq
  • foreach
  • foreachParallel
  • onComplete
  • fold
  • reduce
  • combine
  • actorRef
  • actorRefWithAck
  • actorSubscriber
    asPublisher
  • fromSubscriber
  • toFile

We will now look at some example of using some of these

def simpleFlow() : Unit = {
  val source = Source(1 to 10)
  val sink = Sink.fold[Int, Int](0)(_ + _)
  // connect the Source to the Sink, obtaining a RunnableGraph
  val runnable: RunnableGraph[Future[Int]] = source.toMat(sink)(Keep.right)
  // materialize the flow and get the value of the FoldSink
  implicit val timeout = Timeout(5 seconds)
  val sumFuture: Future[Int] = runnable.run()
  val sum = Await.result(sumFuture, timeout.duration)
  println(s"source.toMat(sink)(Keep.right) Sum = $sum")
 
  // Use the shorthand source.runWith(sink)
  val sumFuture2: Future[Int] = source.runWith(sink)
  val sum2 = Await.result(sumFuture2, timeout.duration)
  println(s"source.runWith(sink) Sum = $sum")
}

In this simple example we have s Source(1 to 10) which we then wire up to a Sink which adds the numbers coming in.

This block demonstrates various different Source(s) and Sink(s)

def differentSourcesAndSinks() : Unit = {
  //various sources
  Source(List(1, 2, 3)).runWith(Sink.foreach(println))
  Source.single("only one element").runWith(Sink.foreach(println))
  //actor sink
  val helloActor = system.actorOf(Props[HelloActor], name = "helloactor")
  Source(List("hello", "hello"))
    .runWith(Sink.actorRef(helloActor,DoneMessage))
  //future source
  val futureString = Source.fromFuture(Future.successful("Hello Streams!"))
    .toMat(Sink.head)(Keep.right).run()
  implicit val timeout = Timeout(5 seconds)
  val theString = Await.result(futureString, timeout.duration)
  println(s"theString = $theString")
}

And this block demos using a simple Map on a Source

def mapFlow() : Unit = {
  val source = Source(11 to 16)
  val doublerSource = source.map(x => x * 2)
  val sink = Sink.foreach(println)
  implicit val timeout = Timeout(5 seconds)
 
  // Use the shorthand source.runWith(sink)
  val printSinkFuture: Future[Done] = doublerSource.runWith(sink)
  Await.result(printSinkFuture, timeout.duration)
}

Working With The Graph API

Akka streams also comes with a pretty funky graph building DSL. You would use this when you want to create quite elaborate flows.

The other very interesting thing about the graph builder DSL is that you can use custom shapes inside it, and you can also leave it partially connected. Such that you could potentially use it as a Source/Sink.

Lets say you had an output from the graph you built using the graph DSL, you could then use that partially constructed graph as a Source in its own right.

The same goes if you had an unconnected input in the graph you created you could use that as a Sink.

You can read more about this here :

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-graphs.html#constructing-sources-sinks-and-flows-from-partial-graphs

I urge you all to have a read of that as its quite cool what can be done with the graph DSL

Ok so time for an example, this example comes directly from the TypeSafe activator code

http://www.lightbend.com/activator/template/akka-stream-scala

package com.sas.graphs
 
import java.io.File
 
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.ClosedShape
import akka.stream.scaladsl._
import akka.util.ByteString
 
import scala.concurrent.forkjoin.ThreadLocalRandom
import scala.util.{ Failure, Success }
 
class WritePrimesDemo {
 
  def run(): Unit = {
    implicit val system = ActorSystem("Sys")
    import system.dispatcher
    implicit val materializer = ActorMaterializer()
 
    // generate random numbers
    val maxRandomNumberSize = 1000000
    val primeSource: Source[Int, NotUsed] =
      Source.fromIterator(() => Iterator.continually(ThreadLocalRandom.current().nextInt(maxRandomNumberSize))).
        // filter prime numbers
        filter(rnd => isPrime(rnd)).
        // and neighbor +2 is also prime
        filter(prime => isPrime(prime + 2))
 
    // write to file sink
    val fileSink = FileIO.toPath(new File("target/primes.txt").toPath)
    val slowSink = Flow[Int]
      // act as if processing is really slow
      .map(i => { Thread.sleep(1000); ByteString(i.toString) })
      .toMat(fileSink)((_, bytesWritten) => bytesWritten)
 
    // console output sink
    val consoleSink = Sink.foreach[Int](println)
 
    // send primes to both slow file sink and console sink using graph API
    val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
      (slow, console) =>
        import GraphDSL.Implicits._
        val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
        primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
        broadcast ~> console // connect other side of splitter to console
        ClosedShape
    }
    val materialized = RunnableGraph.fromGraph(graph).run()
 
    // ensure the output file is closed and the system shutdown upon completion
    materialized.onComplete {
      case Success(_) =>
        system.terminate()
      case Failure(e) =>
        println(s"Failure: ${e.getMessage}")
        system.terminate()
    }
 
  }
 
  def isPrime(n: Int): Boolean = {
    if (n <= 1) false
    else if (n == 2) true
    else !(2 to (n - 1)).exists(x => n % x == 0)
  }
}

The most important part of this code is this part

// send primes to both slow file sink and console sink using graph API
val graph = GraphDSL.create(slowSink, consoleSink)((slow, _) => slow) { implicit builder =>
  (slow, console) =>
    import GraphDSL.Implicits._
    val broadcast = builder.add(Broadcast[Int](2)) // the splitter - like a Unix tee
    primeSource ~> broadcast ~> slow // connect primes to splitter, and one side to file
    broadcast ~> console // connect other side of splitter to console
    ClosedShape
}
val materialized = RunnableGraph.fromGraph(graph).run()

There is 2 sinks defined before we use the Graph

  • A file Sink
  • A console Sink

There is also a Source that generates random primes

So the Graph DSL allows you to um well create graphs. It allows you to take in inputs and create other shapes using the implicit builder that is provided.

The DSL then allows you to connect inputs/other builder creates stages/shapes to the inputs and even expose the connected stages to an output.

This is done using the ~> syntax than simply means connect

As previously stated you can create partially connected graphs, but if you have all inputs and outputs connected it is considered a ClosedShape, that can be used as an isolated component

Here is an example of the output of running this graph example

Image 1

Create Custom Shapes/Stages

It doesn’t stop there, we can also create out own shapes that can be used in flows. This is a pretty complex subject and you will definitely benefit from reading this page

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html

There is no way this little post will cover enough, but here are some highlights of the official documentation

This is the basic pattern you would use to create a custom stage

import akka.stream.SourceShape
import akka.stream.stage.GraphStage
  
class NumbersSource extends GraphStage[SourceShape[Int]] {
  // Define the (sole) output port of this stage
  val out: Outlet[Int] = Outlet("NumbersSource")
  // Define the shape of this stage, which is SourceShape with the port we defined above
  override val shape: SourceShape[Int] = SourceShape(out)
  
  // This is where the actual (possibly stateful) logic will live
  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic = ???
}

Most of the actual logic will be inside the createLogic method. But in order to do anything useful in there you will need to use handlers. Handlers are what you use to handle input/output. There are InHandler and OutHandler.

Each of which has its own state machine flow. For example this is the state machine for an OutHandler

Image 2

Whilst this is the one for InHandler

Image 3

This is the best page to read to learn more about these handlers

http://doc.akka.io/docs/akka/2.4.2/scala/stream/stream-customize.html#Port_states__InHandler_and_OutHandler

The one and ONLY place that state should be maintained is within the createLogic method.

Lets consider a small example. Lets say we have some objects like this

case class Element(id: Int, value: Int)

And we want to build a custom stage that will allow us to select a value from this type, and should only emit an output value for unique values as provided by the property selector.

We could call this DistinctUntilChanged. Lets see what an example for this could look like

package com.sas.customshapes
 
import akka.stream.stage.{GraphStageLogic, InHandler, OutHandler, GraphStage}
import akka.stream.{Outlet, Attributes, Inlet, FlowShape}
 
import scala.collection.immutable
 
final class DistinctUntilChanged[E, P](propertyExtractor: E => P)
  extends GraphStage[FlowShape[E, E]] {
 
  val in = Inlet[E]("DistinctUntilChanged.in")
  val out = Outlet[E]("DistinctUntilChanged.out")
 
  override def shape = FlowShape.of(in, out)
 
  override def createLogic(attributes: Attributes) = new GraphStageLogic(shape) {
 
    private var savedState : Option[E] = None
 
    setHandlers(in, out, new InHandler with OutHandler {
 
      override def onPush(): Unit = {
        val nextElement = grab(in)
        val nextState = propertyExtractor(nextElement)
 
        if (savedState.isEmpty  || propertyExtractor(savedState.get) != nextState) {
          savedState = Some(nextElement)
          push(out, savedState.get)
        }
        else {
          pull(in)
        }
        savedState = Some(nextElement)
      }
 
      override def onPull(): Unit = {
        pull(in)
      }
 
      override def onUpstreamFinish(): Unit = {
        completeStage()
      }
    })
 
    override def postStop(): Unit = {
      savedState = None
    }
  }
}

The highlights of this are

  • We have a single Inlet
  • We have a single Outlet
  • We expose a FlowShape (in/out only) there are many shapes but FlowShape is what we want for one in/out out
  • We use createLogic to do the work
  • We use an InHandler to handle input
  • We use an OutHandler to handle output

One other important thing (at least for this single in/out example) is that we DO NOT call pull/push more than once in the createLogic

Lets assume we have these elements

package com.sas.customshapes
 
import scala.collection.immutable
 
object SampleElements {
 
  val E11 = Element(1, 1)
  val E21 = Element(2, 1)
  val E31 = Element(3, 1)
  val E42 = Element(4, 2)
  val E52 = Element(5, 2)
  val E63 = Element(6, 3)
 
  val Ones = immutable.Seq(E11, E21, E31)
  val Twos = immutable.Seq(E42, E52)
  val Threes = immutable.Seq(E63)
 
  val All = Ones ++ Twos ++ Threes
}

And this demo code

def runDistinctUntilChanged() : Unit = {
  Source(SampleElements.All)
    .via(new DistinctUntilChanged(_.value))
    .runWith(Sink.foreach(println))
}

We would get this output to the Sink

Image 4

This example does owe a lot to a nice blog post I found here :

https://www.softwaremill.com/implementing-a-custom-akka-streams-graph-stage/

That’s It

Anyway that is the end of the series I hope you have enjoyed it, and have learnt you some Akka along the way

I am going to have a small break now and then start looking into some Azure/Web stuff I think

Where Can I Find The Code Examples?

I will be augmenting this GitHub repo with the example projects as I move through this series

https://github.com/sachabarber/SachaBarber.AkkaExamples

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)