Last Time
So last time, we walked through the Rating Kafka streams architecture and also showed how we can query the local state stores. I also stated that the standard KafkaProducer that was used in the last post was more for demonstration purposes and long term, we would like to swap that out with a Play framework REST endpoint that allowed us to publish a message straight from our app to the Kafka rating stream processing.
PreAmble
Just as a reminder, this is part of my ongoing set of posts which I talk about here, where we will be building up to a point where we have a full app using lots of different stuff, such as these:
- WebPack
- React.js
- React Router
- TypeScript
- Babel.js
- Akka
- Scala
- Play (Scala Http Stack)
- MySql
- SBT
- Kafka
- Kafka Streams
Ok, so now that we have the introductions out of the way, let's crack on with what we want to cover in this post.
Where is the Code?
As usual, the code is on GitHub at https://github.com/sachabarber/MadCapIdea.
What Is This Post All About?
As stated in the last post, “Kafka interactive queries”, we used a standard KafkaProducer
to simulate what would have come from the end user via the Play Framework API. This time, we will build out the Play Framework side of things, to include the ability to produce “rating” objects that are consumed via the rating Kafka Stream processing topology introduced in the last post
SBT
So we already had an SBT file inside of the PlayBackEndApi project, but we need to expand that to include support for a couple of things:
- Reactive Kafka
- Jackson JSON (play already comes with its own JSON support, but for the Kafka Serialization-DeSerialization (Serdes), I wanted to make sure it was the same as the Kafka Streams project
This means these additions to the built.sbt file:
val configVersion = "1.0.1"
libraryDependencies ++= Seq(
"org.reactivemongo" %% "play2-reactivemongo" % "0.11.12",
"com.typesafe.akka" % "akka-stream-kafka_2.11" % "0.17",
"org.skinny-framework.com.fasterxml.jackson.module" % "jackson-module-scala_2.11" % "2.8.4",
"com.typesafe" % "config" % configVersion
)
Topics
We also want to ensure that we are using the same topics as the stream processing topology so I have just replicated that class (in reality, I should have made this stuff a common JAR, but meh).
package kafka.topics
object RatingsTopics {
val RATING_SUBMIT_TOPIC = "rating-submit-topic"
val RATING_OUTPUT_TOPIC = "rating-output-topic"
}
Routing
As this is essentially a new route that would be called via the front end React app when a new Rating is given, we obviously need a new route/controller. The route is fairly simple which is as follows:
# Rating page
POST /rating/submit/new controllers.RatingController.submitNewRating()
JSON
The new Rating route expects a Rating
object to be provided as a POST
in JSON. Here is the actual Rating
object and play JSON handling for it:
package Entities
import play.api.libs.json._
import play.api.libs.functional.syntax._
case class Rating(fromEmail: String, toEmail: String, score: Float)
object Rating {
implicit val formatter = Json.format[Rating]
}
object RatingJsonFormatters {
implicit val ratingWrites = new Writes[Rating] {
def writes(rating: Rating) = Json.obj(
"fromEmail" -> rating.fromEmail,
"toEmail" -> rating.toEmail,
"score" -> rating.score
)
}
implicit val ratingReads: Reads[Rating] = (
(JsPath \ "fromEmail").read[String] and
(JsPath \ "toEmail").read[String] and
((JsPath \ "score").read[Float])
)(Rating.apply _)
}
Controller
So now that we have a route, let's turn our attention to the new RatingController
. Which right now to just accept a new Rating just looks like this:
package controllers
import javax.inject.Inject
import Actors.Rating.RatingProducerActor
import Entities.RatingJsonFormatters._
import Entities._
import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import play.api.libs.json._
import play.api.mvc.{Action, Controller}
import utils.Errors
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import scala.concurrent.duration._
class RatingController @Inject()
(
implicit actorSystem: ActorSystem,
ec: ExecutionContext
) extends Controller
{
val decider: Supervision.Decider = {
case _ => Supervision.Restart
}
implicit val mat = ActorMaterializer(
ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
val childRatingActorProps = Props(classOf[RatingProducerActor],mat,ec)
val rand = new Random()
val ratingSupervisorProps = BackoffSupervisor.props(
Backoff.onStop(
childRatingActorProps,
childName = s"RatingProducerActor_${rand.nextInt()}",
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
).withSupervisorStrategy(
OneForOneStrategy() {
case _ => SupervisorStrategy.Restart
})
)
val ratingSupervisorActorRef =
actorSystem.actorOf(
ratingSupervisorProps,
name = "ratingSupervisor"
)
def submitNewRating = Action.async(parse.json) { request =>
Json.fromJson[Rating](request.body) match {
case JsSuccess(newRating, _) => {
ratingSupervisorActorRef ! newRating
Future.successful(Ok(Json.toJson(
newRating.copy(toEmail = newRating.toEmail.toUpperCase))))
}
case JsError(errors) =>
Future.successful(BadRequest(
"Could not build a Rating from the json provided. " +
Errors.show(errors)))
}
}
}
The main points from the code above are:
- We use the standard Play framework JSON handling for the unmarshalling/marshalling to JSON
- That controller route is
async
(see how it returns a Future[T]
) - That we will not process anything if the JSON is invalid
- That the
RatingController
creates a supervisor actor that will supervise the creation of another actor namely a RatingProducerActor
. It may look like this happens each time the RatingController
is instantiated, which is true. However this only happens once as there is only one router in play, and the controller are created by the router. You can read more about this here. The short story is that the supervisor is created once, and the actor is supervises will be created using a BackOffSupervisor
where the creation of the actor will be retried using an incremental back off strategy. We also use the OneForOneStrategy
to ensure only the single failed child actor is effected by the supervisor. - That this controller is also responsible for creating a
ActorMaterializer
with a supervision strategy (more on this in the next section). The ActorMaterializer
is used to create actors within Akka Streams workflows.
RatingProducerActor
The final part of the pipeline for this post is obviously to be able to write a Rating to a Kafka topic, via a Kafka producer. As already stated, I chose to use reactive a Reactive Kafka (akka streams Kafka producer which build upon Akka streams ideas, where we have Sinks/Sources/Flow/RunnableGraph all the good stuff. So here is the full code for the actor:
package Actors.Rating
import Entities.Rating
import Serialization.JSONSerde
import akka.Done
import akka.actor.{Actor, PoisonPill}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.{ActorMaterializer, KillSwitches}
import akka.stream.scaladsl.{Keep, MergeHub, Source}
import kafka.topics.RatingsTopics
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import utils.Settings
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
class RatingProducerActor(
implicit materializer: ActorMaterializer,
ec: ExecutionContext
) extends Actor {
val jSONSerde = new JSONSerde[Rating]
val ratingProducerSettings = ProducerSettings(
context.system,
new StringSerializer,
new ByteArraySerializer)
.withBootstrapServers(Settings.bootStrapServers)
.withProperty(Settings.ACKS_CONFIG, "all")
val ((mergeHubSink, killswitch), kafkaSourceFuture) =
MergeHub.source[Rating](perProducerBufferSize = 16)
.map(rating => {
val ratingBytes = jSONSerde.serializer().serialize("", rating)
(rating, ratingBytes)
})
.map { ratingWithBytes =>
val (rating, ratingBytes) = ratingWithBytes
new ProducerRecord[String, Array[Byte]](
RatingsTopics.RATING_SUBMIT_TOPIC, rating.toEmail, ratingBytes)
}
.viaMat(KillSwitches.single)(Keep.both)
.toMat(Producer.plainSink(ratingProducerSettings))(Keep.both)
.run()
kafkaSourceFuture.onComplete {
case Success(value) => println(s"Got the callback, value = $value")
case Failure(e) => self ! PoisonPill
}
override def postStop(): Unit = {
println(s"RatingProducerActor seen 'Done'")
killswitch.shutdown()
super.postStop()
}
override def receive: Receive = {
case (rating: Rating) => {
println(s"RatingProducerActor seen ${rating}")
Source.single(rating).runWith(mergeHubSink)
}
case Done => {
println(s"RatingProducerActor seen 'Done'")
killswitch.shutdown()
self ! PoisonPill
}
}
}
I’ll be honest there is a fair bit going on in that small chunk of code above, so let's dissect it. What exactly is happening?
- The most important point is that we simply use the actor as a vessel to host a reactive kafka akka stream
RunnableGraph
representing a Graph of MergeHub – > Reactive Kafka producer sink. This is completely fine and a normal thing to do. Discussing akka streams is out of scope for this post but if you want to know more, you can read more on a previous post, I did here. - So we now know this actor hosts a stream, but the stream could fail, or the actor could fail. So what we want is if the actor fails, the stream is stopped, and if the stream fails the actor is stopped. To do that, we need to do a couple of things:
- STREAM FAILING: Since the
RunnableGraph
can return a Future[T]
, we can hook a callback Success/Failure on that, and send a PoisonPill
to the hosting actor. Then the supervisor actor we saw above would kick in and try and create a new instance of this actor. Another thing to note is that the stream hosted in this actor uses the ActorMaterializer
that was supplied by the RatingController
, where we provided a restart supervision strategy for the stream. - ACTOR FAILING: If the actor itself fails, the Akka framework will call the
postStop()
method, at which point we want to shutdown the stream within this actor. So how can we shutdown the hosted stream? Well see in the middle of the stream setup, there is this line .viaMat(KillSwitches.single)(Keep.both)
- this allows us to get a killswitch
from the materialized values for the stream. Once we have a KillSwitch
, we can simply call its shutDown()
method. - BELTS AND BRACES: I have also provided a way for the outside world to shutdown this actor and its hosted stream. This is via sending this actor a Done message. I have not put this in yet, but the hook is there to demonstrate how you could do this.
- We can see that there is a
MergeHub
source which allows external code to push stuff through the MergeHub
via the materialized Sink value from within the actor - We can also see that the
Rating
object that the actor sees is indeed pushed into the MergeHub
materialized Sink via this actor, and then some transformation is done on it, to grab its raw bytes - We can see the final stage in the
RunnableGraph
is the Reactive Kafka Producer.plainSink
. Which would result in a message being pushed out to a Kafka topic from the hosted stream, pushed Rating
object from this actor into the stream
And I think that is the main set of points about how this actor works.
The End Result
Just to prove that this is all working, here is a screen shot of the new RatingController
http://localhost:9000/rating/submit/new endpoint being called with a JSON payload representing the Rating.
And here is the Akka Http endpoint that queries the Kafka Stream state store(s).
http://localhost:8080/ratingByEmail?email=sacha@here.com gives an empty list as we have NOT sent any Rating through for email “sacha@here.com” yet.
http://localhost:8080/ratingByEmail?email=henry@there.com gives 1 result which consists of the amount of Rating(s) I created.
http://localhost:8080/ratingByEmail?email=henry@there.com gives 3 result which consists of the amount of Rating(s) I created.
So that’s cool this means that we have successfully integrated publishing of a JSON payload Rating object through Kafka to the the Kafka streams Rating processor… happy days!
Conclusion
Straight after the last article, I decided to Dockerize everything (a decision I have now reversed, due to the flay nature of Dockers dependsOn and it not truly waiting for the item depended on even when using “condition : server_healthy
” and “healthcheck
– test” etc.), and some code must have become corrupt, as stuff from the last post stopped working.
An example from the Docker-Compose docs being:
version: '2.1'
services:
web:
build: .
depends_on:
db:
condition: service_healthy
redis:
condition: service_started
redis:
image: redis
db:
image: redis
healthcheck:
test: "exit 0"
I love Docker but for highly complex setups, I think you are better off using a Docker-Compose file but just not trying to bring it all up in one go. I may bring the Docker bits back into the fold for anyone that is reading this that wants to play around, but I will have to think about that closely.
Once I realized that my Docker campaign was doing more harm than good, and I reverted back to my extremely hand coded, but deterministic PowerShell startup script, I found that getting the Play Framework and a Reactive Kafka (akka streams Kafka producer up and running was quite simple, and it kind of worked like a charm first time. Yay
Next Time
Next time, we should be able to making the entire rating view page work as we now have the following things:
So we should quite easily be able to turn that data into a simple bootstrap table in the React portion of this application.