Last time we talked about routing within Akka. This time we will be looking at Akkas support for http.
But just before that, a bit of history. Before Akka.Http there was already a fairly successful Akk based http option available to you as a Scala developer, called Spray. There is a lot of Spray documentation available here http://spray.io/
This framework was extremely well thought of, so much so that the good people at Akka have taken on much of the good work done by this team, and it now forms much of the codebase for Akka Http.
In fact if you are familiar with Spray, you will certainly notice quite a lot of similarities in the way routes and JSON are handled in Akka.Http, as it is pretty much the Spray code.
Introduction
Akka.Http comes with server side and client side libraries. It also comes with good support for standard serialization such as JSON/XML and the ability to roll your own serialization should you want to.
It also comes with a fairly nifty routing DSL which is very much inspired by the work done in Spray.
This post will concentrate on the common use cases that you may come across when working with HTTP.
SBT Dependencies
As usual we need to make sure we have the correct JARs referenced. So here is the SBT file that I am using for both the server side/client side and common messages that pass between them
import sbt._
import sbt.Keys._
lazy val allResolvers = Seq(
"Typesafe Repository" at "http://repo.typesafe.com/typesafe/releases/",
"Akka Snapshot Repository" at "http://repo.akka.io/snapshots/"
)
lazy val AllLibraryDependencies =
Seq(
"com.typesafe.akka" % "akka-actor_2.11" % "2.4.12",
"com.typesafe.akka" % "akka-http_2.11" % "3.0.0-RC1",
"com.typesafe.akka" % "akka-http-core_2.11" % "3.0.0-RC1",
"com.typesafe.akka" % "akka-http-spray-json_2.11" % "3.0.0-RC1"
)
lazy val commonSettings = Seq(
version := "1.0",
scalaVersion := "2.11.8",
resolvers := allResolvers,
libraryDependencies := AllLibraryDependencies
)
lazy val serverside =(project in file("serverside")).
settings(commonSettings: _*).
settings(
name := "serverside"
)
.aggregate(common, clientside)
.dependsOn(common, clientside)
lazy val common = (project in file("common")).
settings(commonSettings: _*).
settings(
name := "common"
)
lazy val clientside = (project in file("clientside")).
settings(commonSettings: _*).
settings(
name := "clientside"
)
.aggregate(common)
.dependsOn(common)
It can be seen that the JSON dependency is contained in this JAR
akka-http-spray-json_2.11
Told you is was inspired by Spray a fair bit
Server Side
This section will talk about the server side element of Akka.Http
Hosting The Service
To have a correctly formed/hostable server side we need a couple of things in place, namely the following
- An actor system
- A materializer (Akka http uses flows which is the subject of the next and final post)
- An execution context
- Routing
Once we have these things it is really just a question of binding the route to a host name and port.
Shown below is a barebones skeleton of what this may look like
import akka.NotUsed
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.{ContentTypes, HttpEntity}
import akka.http.scaladsl.server.Directives
import akka.stream.scaladsl.Flow
import common.{Item, JsonSupport}
import scala.io.StdIn
import scala.concurrent.Future
import akka.http.scaladsl.model.ws.{Message, TextMessage}
import akka.stream._
import akka.stream.scaladsl._
object Demo extends App with Directives with JsonSupport {
implicit val system = ActorSystem("my-system")
implicit val materializer = ActorMaterializer()
val route = .....
val (host, port) = ("localhost", 8080)
val bindingFuture = Http().bindAndHandle(route, host, port)
bindingFuture.onFailure {
case ex: Exception =>
println(s"$ex Failed to bind to $host:$port!")
}
println(s"Server online at http://localhost:8080/\nPress RETURN to stop...")
StdIn.readLine()
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => system.terminate())
}
We will be looking at the routing DSL separately
Routing DSL
As stated, Akka.Http owes much to Spray, and the routing DSL in particular is practically unchanged from Spray, so it is well worth reading the Spray routing documentation which is available here : http://spray.io/documentation/1.2.4/spray-routing/ and for completeness here is the Akka.Http docs link too : http://doc.akka.io/docs/akka/2.4.7/scala/http/introduction.html#routing-dsl-for-http-servers
There is way too many possible routes to go into for a single post. Lets consider a few basic examples and deconstruct them
Some of these examples do rely on JSON which is the next topic, so for now just understand that there is a way to accept/return JSON.
Lets consider the following use cases
- GET that returns a simple string
- GET that returns a JSON representation of an
Item
- POST that accept a new
Item
In all these cases this is what an Item
looks like
package common
final case class Item(name: String, id: Long)
So lets see the routing DSL that makes the above examples work
val route =
path("hello") {
get {
complete(HttpEntity(
ContentTypes.`text/html(UTF-8)`,
"<h1>Say hello to akka-http</h1>"))
}
} ~
path("randomitem") {
get {
complete(Item("thing", 42))
}
} ~
path("saveitem") {
post {
entity(as[Item]) { item =>
println(s"Server saw Item : $item")
complete(item)
}
}
}
It can be seen that there are some common routing DSL bits and bobs in there, such as:
path
: which satisfies the route name part of the route get
: which tells us that we should go further into the route matching if it’s a GET http request and it matched the path route DSL part post
: which tells us that we should go further into the route matching if it’s a POST http request and it matched the path route DSL part complete
: This is the final result from the route
These parts of the DSL are known as directives. The general anatomy of a directive is as follows:
name(arguments) { extractions =>
...
}
It has a name, zero or more arguments and optionally an inner route (The RouteDirectives are special in that they are always used at the leaf-level and as such cannot have inner routes). Additionally directives can “extract” a number of values and make them available to their inner routes as function arguments. When seen “from the outside” a directive with its inner route form an expression of type Route.
Taken from http://doc.akka.io/docs/akka/2.4.7/scala/http/routing-dsl/directives/index.html#directives up on date 15/11/16
What Directives Do?
A directive can do one or more of the following:
- Transform the incoming RequestContext before passing it on to its inner route (i.e. modify the request)
- Filter the RequestContext according to some logic, i.e. only pass on certain requests and reject others
- Extract values from the RequestContext and make them available to its inner route as “extractions”
- Chain some logic into the RouteResult future transformation chain (i.e. modify the response or rejection)
- Complete the request
This means a Directive completely wraps the functionality of its inner route and can apply arbitrarily complex transformations, both (or either) on the request and on the response side.
Ok so now that we have taken a whistle stop tour of the routing DSL and directives, lets have a look at the few we discussed above
For this work I would strongly recommend the use of the “Postman” google app, which you can grab from here
https://chrome.google.com/webstore/detail/postman/fhbjgbiflinjbdggehcddcbncdddomop?hl=en
GET
We can see this route looks like this
path("hello") {
get {
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`, "<h1>Say hello to akka-http</h1>"))
}
}
So we use the path
, and also the get
directives to establish a get route. We then use complete
to complete the route with some static string representing the html we would like to return
So let’s see this one in postman
GET Item (as JSON)
We can see this route looks like this
path("randomitem") {
get {
complete(Item("thing", 42))
}
}
So again we use the path
/get
directives, but this time we complete
with an Item. This is done due to the JSON support that is able to create the right serialization data for us. We will look at this in the next section
So let’s see this one in postman
POST Item
We can see this route looks like this
path("saveitem") {
post {
entity(as[Item]) { item =>
println(s"Server saw Item : $item")
complete(item)
}
}
}
So again we use the path
directive, but this time we use a post
, where the post expects an item as JSON to be provided. The converting from the incoming JSON string to an Item is done using an Unmarshaller, we will look at this in the next section
So let’s see this one in postman
JSON Support
Akka.http provides JSON support using this library akka-http-spray-json-experimental which you can grab from Maven Central Repo.
JsonProtocol
When using spray we may use the SprayJsonProtocol and DefaultJsonProtocol to create the JSON protcol for your custom objects
Lets consider the Item class we have seen in the demos so far
package common
final case class Item(name: String, id: Long)
This is how we might write the JSON protocol code for this simple class
package common
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport
import spray.json.DefaultJsonProtocol
trait JsonSupport extends SprayJsonSupport with DefaultJsonProtocol {
implicit val itemFormat = jsonFormat2(Item)
}
It can be seen that there are jsonFormatXX
helpers that can be used for very simple cases. In this case jsonFormat2
is used as our item class had 2 parameters
Most of the time this inbuilt helpers are all we need. If however you want something more elaborate you are free to create your own jsonFormat
read / write methods
Marshalling
Marshalling is sprays process of taking objects and create a JSON string representation of them to send across the wire.
The Akka Spray JAR comes with a bunch of default marshallers that allow us to take custom classes and turn them into JSON
These are the most common default marshallers that you will most likely use
type ToEntityMarshaller[T] = Marshaller[T, MessageEntity]
type ToHeadersAndEntityMarshaller[T] = Marshaller[T, (immutable.Seq[HttpHeader], MessageEntity)]
type ToResponseMarshaller[T] = Marshaller[T, HttpResponse]
type ToRequestMarshaller[T] = Marshaller[T, HttpRequest]
You can read more about this here : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/marshalling.html
Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you when you do the complete this is taken care of for you providing there is a marshaller that can be found implicitly
Unmarshalling
Unmarshalling is the process of taking the on the wire format (JSON string in these examples) back into a scala class (Item class in this case)
You can read more about this at the official Akka docs page : http://doc.akka.io/docs/akka/2.4.7/scala/http/common/unmarshalling.html
Luckily you don’t really have to get that involved with these that often as the routing DSL does most of the heavy lifting for you, which is what we use this part of the routing DSL, where this will use an unmarshaller to create the Item from the JSON string on the wire
entity(as[Item]) { item =>
WebSockets
Akka Http also supports web sockets too. Lets start this investigation with looking at what is required from the routing DSL perspective, which starts like this
path("websocket") {
get {
handleWebSocketMessages(websocketFlow)
}
} ~
If we look at this special directive a bit more, what exactly does the handleWebSocketMessages
directive look like
Well it looks like this:
def handleWebSocketMessages(handler: Flow[Message, Message, Any]): Route
So we need to supply a flow. A Flow is part of akka reactive streams which will look at in the next part. But for now just be aware that you can create a Flow from a Sink/Source and Materializer to materialize the flow.
For this websocket example here is what the Flow looks like
val (websocketSink, websocketSource) =
MergeHub.source[String].toMat(BroadcastHub.sink[String])(Keep.both).run()
val websocketFlow: Flow[Message, Message, NotUsed] =
Flow[Message].mapAsync(1) {
case TextMessage.Strict(text) => Future.successful(text)
case streamed: TextMessage.Streamed => streamed.textStream.runFold("")(_ ++ _)
}.via(Flow.fromSinkAndSource(websocketSink, websocketSource))
.map[Message](string => TextMessage(string))
The idea is that when a websocket client connects and sends an initial message they will get a reply TextMessage
sent over the websocket to them
This uses some pretty new akka stream stages namely
MergeHub
: Creates a Source that emits elements merged from a dynamic set of producers. Broadcast
: Emit each incoming element each of n outputs
Lets start by running the server, and then opening the “WebSocketTestClient.html” page which should look like this
Once the page is open, type something in the textbox and hit the “Send” button, you should see this
All fairly normal socket type stuff so far, we send a message from the web page client side to the server and the server responds with the text we sent.
But what about if we wanted to send message to the client on demand, say from another route which could be a command to do some work, which notifies the clients of the websocket?
With this Flow in place, we are also able to push back messages to the client end of the websocket.
Lets see another route which will simulate some work, which results in messages being sent down the websocket back to the client (if its still connected)
Here is the route
path("sendmessagetowebsocket" / IntNumber) { msgCount =>
post {
for(i <- 0 until msgCount)
{
Source.single(s"sendmessagetowebsocket $i").runWith(websocketSink)
}
complete("done")
}
}
It can be seen that we simply create a new source which is run with the existing Sink that was part of the Flow used by the websocket
Here is what this would look like in postman
And here is what the web page client side websocket example looks like after this route has been called as above
Client Side
Akka http support comes with 3 types of client API that one can use
- Connection level client API
- Host level client API
- Request level client API
In this article I will only be using the last of these APIs, as in my opinion it is the most sensible client side choice.
So what does the request level client API look like.
GET
If we consider that we want to conduct this request
http://localhost:8080/randomitem
which when run via postman gives the following JSON response
So lets see what the code looks like to do this using the request level client API
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.{Await, Future}
import concurrent.ExecutionContext.Implicits.global
import common.{Item, JsonSupport}
import concurrent.duration._
import scala.io.StdIn
class RegularRoutesDemo extends JsonSupport {
def Run() : Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)
val randomItemUrl = s"""/randomitem"""
val flowGet : Future[Item] =
Source.single(
HttpRequest(
method = HttpMethods.GET,
uri = Uri(randomItemUrl))
)
.via(httpClient)
.mapAsync(1)(response => Unmarshal(response.entity).to[Item])
.runWith(Sink.head)
val start = System.currentTimeMillis()
val result = Await.result(flowGet, 5 seconds)
val end = System.currentTimeMillis()
println(s"Result in ${end-start} millis: $result")
}
}
There are a couple of take away points in the code above
- We use a Source which is a HttpRequest, where we can specify the HTTP verb and other request type things
- We use Unmarshal to convert the incoming JSON string to an Item. We discussed Marshalling/Unmarshalling above.
- This obviously relies on the Spray JSON support that we discussed above
POST
If we consider that we want to conduct this request
http://localhost:8080/saveitem
which when run via postman gives the following JSON response
So lets see what the code looks like to do this using the request level client API
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.marshalling.Marshal
import akka.http.scaladsl.model._
import akka.http.scaladsl.unmarshalling.Unmarshal
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import scala.concurrent.{Await, Future}
import concurrent.ExecutionContext.Implicits.global
import common.{Item, JsonSupport}
import concurrent.duration._
import scala.io.StdIn
class RegularRoutesDemo extends JsonSupport {
def Run() : Unit = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
val httpClient = Http().outgoingConnection(host = "localhost", port = 8080)
val saveItemUrl = s"""/saveitem"""
val itemToSave = Item("newItemHere",12)
val flowPost = for {
requestEntity <- Marshal(itemToSave).to[RequestEntity]
response <-
Source.single(
HttpRequest(
method = HttpMethods.POST,
uri = Uri(saveItemUrl),
entity = requestEntity)
)
.via(httpClient)
.mapAsync(1)(response => Unmarshal(response.entity).to[Item])
.runWith(Sink.head)
} yield response
val startPost = System.currentTimeMillis()
val resultPost = Await.result(flowPost, 5 seconds)
val endPost = System.currentTimeMillis()
println(s"Result in ${endPost-startPost} millis: $resultPost")
}
}
The only thing that is different this time, is that we need to pass a JSON string representation of an Item which we pass to the HttpRequest.
This is done use a JSON marshaller which must be in scope implicitly.
Where Can I Find The Code Examples?
I will be augmenting this GitHub repo with the example projects as I move through this series
https://github.com/sachabarber/SachaBarber.AkkaExamples