Last Time
Last time we looked at finishing the “View Rating” page, which queried the Kafka Stream stores via a new endpoint in the play backend. This post will see us finish the workings of the “Create Job” page.
PreAmble
Just as a reminder, this is part of my ongoing set of posts which I talk about here, where we will be building up to a point where we have a full app using lots of different stuff, such as these:
- WebPack
- React.js
- React Router
- TypeScript
- Babel.js
- Akka
- Scala
- Play (Scala Http Stack)
- MySql
- SBT
- Kafka
- Kafka Streams
Ok, so now that we have the introductions out of the way, let's crack on with what we want to cover in this post.
Where is the Code?
As usual, the code is on GitHub here.
What Is This Post All About?
As stated above, this post deals with the “Create Job” functions:
- Allowing the user to specify their geo location position on the map
- Create a JSON Job object
- Send it to a new Play backend endpoint
- Published the Job out over a Kafka topic (using Akka Streams / Reactive Kafka)
- Consume the Job over a Kafka Topic (using Akka Streams / Reactive Kafka)
- Push the consumed Job out of the forever frame (Comet functionality in Play backend)
- Have a new RxJs based Observable over the comet based forever frame, and ensure that is working
As you can see, this post actually covers a lot. In fact, this post can be thought of as the lynch pin to this entire project. With the code that is contained in this post, we now have everything we need to deal with the rather more complex visual aspects of the “View Job” page.
Initially, I was thinking I would create separate Kafka streams (topics) for all the different interactions such as:
- Bid for job
- Accept job
- Complete job
- Cancel job
- Create new job geo location
But then, I came to my senses and realized this could all be achieved using a single stream, that is a Job stream. The idea being is that the JSON payload of that single stream would just hold slightly different state and different points in time.
To see what I mean by this, this is the actual Scala class that we will be turning to/from JSON that is sent out over the SINGLE Kafka topic:
case class Job
(
jobUUID: String,
clientFullName: String,
clientEmail: String,
driverFullName: String,
driverEmail: String,
vehicleDescription: String,
vehicleRegistrationNumber: String,
isAssigned:Boolean,
isCompleted:Boolean
)
With this simple Scala object, we can do everything we want, for example:
- To determine if this is a new job that a driver may be assigned to: we just look for Job items that don’t yet have a “
driverEmail
” - this tells us this job is free and has no driver yet - To determine if this job has been accepted by a client: we just filter/examine the “
isAssigned
” property which may be true
/false
Anyway, you get the idea.
So now that we know there is a single stream, let's proceed with the rest of the post, shall we.
What Are We Trying To Do in This Post?
It really is a continuous chain of a single process which follows these sequential steps:

Allowing the User to Specify their Geo Location Position on the Map
Before we send the actual job JSON payload, we need to allow the user to specify their position such that the position can be retrieved later (right now, the position is maintained in Local Storage not in the Job payload, I may include client/driver positions in the actual payload, we’ll see how that goes).
Once the client sets their OWN position, they are able to create a job, and push out a new job. If they already have a job in flight, the client is NOT able to create a new job.
Thanks to the React map component that was picked some time ago, the position update really just boils down to this code in the CreateJob.tsx file
_handleMapClick = (event) => {
const newState = Object.assign({}, this.state, {
currentPosition: new Position(event.latLng.lat(), event.latLng.lng())
})
this.setState(newState)
}
To deal with the users' current position, I also created this simple service class:
export class Position {
lat: number;
lng: number;
constructor(lat: number, lng: number) {
this.lat = lat;
this.lng = lng;
}
}
import { injectable, inject } from "inversify";
import { Position } from "../domain/Position";
@injectable()
export class PositionService {
constructor() {
}
clearUserPosition = (email: string): void => {
let key = 'currentUserPosition_' + email;
sessionStorage.removeItem(key);
}
storeUserPosition = (currentUser: any, position: Position): void => {
if (currentUser == null || currentUser == undefined)
return;
if (position == null || position == undefined)
return;
let currentUsersPosition = {
currentUser: currentUser,
position: position
}
let key = 'currentUserPosition_' + currentUser.email;
sessionStorage.setItem(key, JSON.stringify(currentUsersPosition));
}
currentPosition = (email: string): Position => {
let key = 'currentUserPosition_' + email;
var currentUsersPosition = JSON.parse(sessionStorage.getItem(key));
return currentUsersPosition.position;
}
hasPosition = (email: string): boolean => {
let key = 'currentUserPosition_' + email;
var currentUsersPosition = JSON.parse(sessionStorage.getItem(key));
return currentUsersPosition != null && currentUsersPosition != undefined;
}
}
Create a JSON Job Object
The next step is to create a Job
object that may be posted for the new Job to the Play backend. This is done via a standard JQuery POST, as follows:
_handleCreateJobClick = () => {
var self = this;
var currentUser = this._authService.user();
var newJob = {
clientFullName: currentUser.fullName,
clientEmail: currentUser.email,
driverFullName: '',
driverEmail: '',
vehicleDescription: '',
vehicleRegistrationNumber: '',
isAssigned: false,
isCompleted: false
}
$.ajax({
type: 'POST',
url: 'job/submit',
data: JSON.stringify(newJob),
contentType: "application/json; charset=utf-8",
dataType: 'json'
})
.done(function (jdata, textStatus, jqXHR) {
self._jobService.storeUserIssuedJob(newJob);
const newState = Object.assign({}, self.state, {
hasIssuedJob: self._jobService.hasIssuedJob()
});
self._positionService.storeUserPosition(currentUser, self.state.currentPosition);
hashHistory.push('/viewjob');
})
.fail(function (jqXHR, textStatus, errorThrown) {
const newState = Object.assign({}, self.state, {
okDialogHeaderText: 'Error',
okDialogBodyText: jqXHR.responseText,
okDialogOpen: true,
okDialogKey: Math.random()
})
self.setState(newState)
});
}
This will also store the users' current position using the PositionService
we just saw, and redirect the user (this page is only available to clients as it's all about creating new jobs, which drivers can't do). We also redirect to the “ViewJob
” page on successfully sending a new job.
Send It to a New Play Backend Endpoint
There is a new route to support the Job creation, so obviously we need a new route entry:
POST /job/submit controllers.JobController.submitJob()
Published the Job Out Over a Kafka Topic (using Akka Streams / Reactive Kafka)
Ok, so we now know that we have a new endpoint that can accept a “job
” JSON object. What does it do with this Job
JSON object. Well, quite simply, it does this:
- Converts the JSON into a Scala object
- Sends it out over Kafka using Reactive Kafka publisher
You may be asking yourself why we want to burden ourselves with Kafka here at all if all we are going to do is get a Job
JSON payload in them send it out via Kafka only to have it come back in via Kafka. This seems weird, why bother. The reason we want to involve Kafka here is for the audit and commit log facility that it provided. We want a record of the events, that’s what kafka has given us, a nice append only log.
Anyway, what does the new endpoint code look like that accepts the job. Here it is:
package controllers
import javax.inject.Inject
import entities.Job
import entities.JobJsonFormatters._
import entities._
import actors.job.{JobConsumerActor, JobProducerActor}
import akka.actor.{ActorSystem, OneForOneStrategy, Props, SupervisorStrategy}
import akka.pattern.{Backoff, BackoffSupervisor}
import akka.stream.scaladsl.{BroadcastHub, Keep, MergeHub}
import akka.stream.{ActorMaterializer, ActorMaterializerSettings, Supervision}
import play.api.http.ContentTypes
import play.api.libs.Comet
import play.api.libs.json._
import play.api.libs.json.Json
import play.api.libs.json.Format
import play.api.libs.json.JsSuccess
import play.api.libs.json.Writes
import play.api.mvc.{Action, Controller}
import utils.Errors
import scala.concurrent.{ExecutionContext, Future}
import scala.util.Random
import scala.concurrent.duration._
class JobController @Inject()
(
implicit actorSystem: ActorSystem,
ec: ExecutionContext
) extends Controller
{
val rand = new Random()
val decider: Supervision.Decider = {
case _ => Supervision.Restart
}
implicit val mat = ActorMaterializer(
ActorMaterializerSettings(actorSystem).withSupervisionStrategy(decider))
val (sink, source) =
MergeHub.source[JsValue](perProducerBufferSize = 16)
.toMat(BroadcastHub.sink(bufferSize = 256))(Keep.both)
.run()
val childJobProducerActorProps = Props(classOf[JobProducerActor],mat,ec)
val jobProducerSupervisorProps = createBackoffSupervisor(childJobProducerActorProps,
s"JobProducerActor_${rand.nextInt()}")
val jobProducerSupervisorActorRef = actorSystem.actorOf(jobProducerSupervisorProps,
name = "jobProducerSupervisor")
val childJobConsumerActorProps = Props(new JobConsumerActor(sink)(mat,ec))
val jobConsumerSupervisorProps = createBackoffSupervisor(childJobConsumerActorProps,
s"JobConsumerActor_${rand.nextInt()}")
val jobConsumerSupervisorActorRef = actorSystem.actorOf(jobConsumerSupervisorProps,
name = "jobConsumerSupervisor")
jobConsumerSupervisorActorRef ! Init
def streamedJob() = Action {
Ok.chunked(source via Comet.json("parent.jobChanged")).as(ContentTypes.HTML)
}
def submitJob = Action.async(parse.json) { request =>
Json.fromJson[Job](request.body) match {
case JsSuccess(job, _) => {
jobProducerSupervisorActorRef ! job
Future.successful(Ok(Json.toJson(job.copy(clientEmail = job.clientEmail.toUpperCase))))
}
case JsError(errors) =>
Future.successful(BadRequest("Could not build a Job from the json provided. " +
Errors.show(errors)))
}
}
private def createBackoffSupervisor(childProps:Props, actorChildName: String) : Props = {
BackoffSupervisor.props(
Backoff.onStop(
childProps,
childName = actorChildName,
minBackoff = 3.seconds,
maxBackoff = 30.seconds,
randomFactor = 0.2
).withSupervisorStrategy(
OneForOneStrategy() {
case _ => SupervisorStrategy.Restart
})
)
}
}
There is a fair bit going on in that code. Let's dissect it a bit.
- We create a backoff supervisor for both the Kafka producer/consumer actors
- We create a stream that is capable of writing to the Comet frame socket
- We provide the sink side (
MergeHub
) of the stream to the consumer actor, such that when it reads a value from Kafka, it will be pumped into the sink which will then travel through the Akka stream back to the web page via the BroadcastHub
and Comet forever frame back to the HTML (and ultimately RxJs Subject)
Push the Consumed Job Out of the Forever Frame (Comet Functionality in Play Backend)
Ok, so we just saw how the 2 actors are created under back off supervisors, and how the consumer (the one that reads from Kafka) gets the ability to essentially write back to the forever frame in the HTML.
So how does the job go out into Kafka land?
That part is quite simple, here it is:
package actors.job
import kafka.topics.JobTopics
import serialization.JSONSerde
import akka.Done
import akka.actor.{Actor, PoisonPill}
import akka.kafka.ProducerSettings
import akka.kafka.scaladsl.Producer
import akka.stream.scaladsl.{Keep, MergeHub, Source}
import akka.stream.{ActorMaterializer, KillSwitches}
import entities.Job
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArraySerializer, StringSerializer}
import utils.Settings
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
class JobProducerActor(
implicit materializer: ActorMaterializer,
ec: ExecutionContext
) extends Actor {
val jSONSerde = new JSONSerde[Job]
val jobProducerSettings = ProducerSettings(
context.system,
new StringSerializer,
new ByteArraySerializer)
.withBootstrapServers(s"${Settings.bootStrapServers}")
val ((mergeHubSink, killswitch), kafkaSourceFuture) =
MergeHub.source[Job](perProducerBufferSize = 16)
.map(job => {
val jobBytes = jSONSerde.serializer().serialize("", job)
(job, jobBytes)
})
.map { jobWithBytes =>
val (job, jobBytes) = jobWithBytes
new ProducerRecord[String, Array[Byte]](
JobTopics.JOB_SUBMIT_TOPIC, job.clientEmail, jobBytes)
}
.viaMat(KillSwitches.single)(Keep.both)
.toMat(Producer.plainSink(jobProducerSettings))(Keep.both)
.run()
kafkaSourceFuture.onComplete {
case Success(value) => println(s"Got the callback, value = $value")
case Failure(e) => {
self ! PoisonPill
}
}
override def postStop(): Unit = {
super.postStop()
println(s"JobProducerActor seen 'Done'")
killswitch.shutdown()
}
override def receive: Receive = {
case (job: Job) => {
println(s"JobProducerActor seen ${job}")
Source.single(job).runWith(mergeHubSink)
}
case Done => {
println(s"JobProducerActor seen 'Done'")
killswitch.shutdown()
self ! PoisonPill
}
}
}
We covered a lot of how this worked in the last post, when we talked about how to create a new Rating. The mechanism is essentially the same but this time for Job JSON data.
Consume the Job Over a Kafka Topic (using Akka Streams / Reactive Kafka)
Let's see the JobConsumerActor
which takes this Sink (MergeHub
from JobController
) and pushes the value out to it, when it sees a new value from Kafka on the job topic “job-submit-topic”. This then travels through the Akka stream where it goes via the BroadcastHub
out to the forever from in the HTML.
Here is the code, it may look scary, but really it's just reading a value of the Kafka topic and pushing it out via the Sink (MergeHub
):
package actors.job
import entities.{Job, Init}
import kafka.topics.JobTopics
import serialization.JSONSerde
import akka.{Done, NotUsed}
import akka.actor.{Actor, ActorSystem, PoisonPill}
import akka.kafka.{ConsumerSettings, ProducerSettings, Subscriptions}
import akka.kafka.scaladsl.{Consumer, Producer}
import akka.stream.scaladsl.{Keep, MergeHub, Sink, Source}
import akka.stream.{ActorMaterializer, KillSwitches}
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.{ByteArrayDeserializer,
ByteArraySerializer, StringDeserializer, StringSerializer}
import play.api.libs.json.{JsValue, Json}
import utils.Settings
import scala.concurrent.ExecutionContext
import scala.util.{Failure, Success}
class JobConsumerActor
(val sink:Sink[JsValue, NotUsed])
(implicit materializer: ActorMaterializer, ec: ExecutionContext
) extends Actor {
val jSONSerde = new JSONSerde[Job]
val jobConsumerSettings = ConsumerSettings(
context.system,new StringDeserializer(),new ByteArrayDeserializer())
.withBootstrapServers(s"${Settings.bootStrapServers}")
.withGroupId("group1")
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
val ((_, killswitch), kafkaConsumerFuture) =
Consumer.committableSource(jobConsumerSettings, Subscriptions.topics(JobTopics.JOB_SUBMIT_TOPIC))
.mapAsync(1) { msg => {
val jobBytes = msg.record.value
val job = jSONSerde.deserializer().deserialize(JobTopics.JOB_SUBMIT_TOPIC,jobBytes)
self ! job
msg.committableOffset.commitScaladsl()
}
}
.viaMat(KillSwitches.single)(Keep.both)
.toMat(Sink.last)(Keep.both)
.run()
kafkaConsumerFuture.onComplete {
case Success(value) => println(s"Got the callback, value = $value")
case Failure(e) => {
self ! PoisonPill
}
}
override def postStop(): Unit = {
super.postStop()
println(s"JobConsumerActor seen 'Done'")
killswitch.shutdown()
}
override def receive: Receive = {
case (job: Job) => {
println(s"JobConsumerActor seen ${job}")
val finalJsonValue = Json.toJson(job)
Source.single(finalJsonValue).runWith(sink)
}
case Done => {
println(s"JobConsumerActor seen 'Done'")
killswitch.shutdown()
self ! PoisonPill
}
case Init => {
println("JobConsumerActor saw init")
}
}
}
Have a New RxJs Based Observable Over the Comet Based forever Frame, and Ensure that is Working
So at the end of the pipeline, we have a forever frame in the browser (always available) that we wish to get events from. Ideally, we want to turn this rather bland event into a better RxJs Observable. So how do we do that. It's quite simple - we use this little service that is able to create a new Observable from the incoming event for us:
import { injectable, inject } from "inversify";
import { JobEventArgs } from "../domain/JobEventArgs";
import Rx from 'rx';
@injectable()
export class JobStreamService {
private _jobSourceObservable: Rx.Observable<any>;
constructor() {
}
init = (): void => {
window['jobChanged'] = function (incomingJsonPayload: any) {
let evt = new CustomEvent('onJobChanged', new JobEventArgs(incomingJsonPayload));
window.dispatchEvent(evt);
}
this._jobSourceObservable = Rx.Observable.fromEvent(window, 'onJobChanged');
}
getJobStream = (): Rx.Observable<any> => {
return this._jobSourceObservable;
}
}
Where the JobEventArgs
looks like this:
<pre class="brush: jscript; title: ; notranslate">
export class JobEventArgs {
detail: any;
constructor(detail: any) {
this.detail = detail;
}
}
We can use this service in other code and subscribe to this RxJs Observable that the above service exposes. Here is an example of subscribing to it. We will talk much more about this in the next post.
componentWillMount() {
this._subscription =
this._jobStreamService.getJobStream()
.subscribe(
jobArgs => {
console.log('RX saw onJobChanged');
console.log('RX x = ', jobArgs.detail);
},
error => {
console.log('RX saw ERROR');
console.log('RX error = ', error);
},
() => {
console.log('RX saw COMPLETE');
}
);
}
Conclusion
I am aware that this post has taken a while to get out there. I had an issue in the middle of this one where I broke something and I had to unwind a whole bunch of commits and bring them back in one by one to see when it broke. This caused a bit of friction. The other reason this post took so long is that life just gets in the way sometimes. Stupid life, huh?
Next Time
Next time, we will focus our attention on the “View Job” page which is probably the most complex visual aspect of this project, but we now have all the plumbing to support it, so it's just a matter of getting it done. After that page is done, this project is pretty much there. Yay!
