Last Time
So last time we came up with a sort of 1/2 way house type post that would pave the way for this one, where we examined several different types of REST frameworks for use with Scala. The requirements were that we would be able to use JSON and that there would be both a client and server side API for the chosen library. For http4s and Akka Http worked. I decided to go with Akka Http due to being more familiar with it.
So that examination of REST APIs has allowed this post to happen. In this post what we will be doing is looking at
- How to install Kafka/Zookeeper and get them running on Windows
- Walking through a KafkaProducer
- Walking through a Kafka Streams processing node, and the duality of streams
- Walking through Kafka Streams interactive queries
PreAmble
Just as a reminder this is part of my ongoing set of posts which I talk about here :
https://sachabarbs.wordpress.com/2017/05/01/madcap-idea/, where we will be building up to a point where we have a full app using lots of different stuff, such as these
- WebPack
- React.js
- React Router
- TypeScript
- Babel.js
- Akka
- Scala
- Play (Scala Http Stack)
- MySql
- SBT
- Kafka
- Kafka Streams
Ok so now that we have the introductions out of the way, lets crack on with what we want to cover in this post.
Where is the code?
As usual the code is on GitHub here : https://github.com/sachabarber/MadCapIdea
Before we start, what is this post all about?
Well I don’t know if you recall but we are attempting to create a uber simple uber type application where there are drivers/clients. A client can put out a new job, drivers bid for it. And at the end of a job they can rate each other, see the job completion/rating/view rating sections of this previous post : https://sachabarbs.wordpress.com/2017/06/27/madcap-idea-part-6-static-screen-design/
The ratings will be placed into streams and aggregated into permanent storage, and will be available for querying later to display in the react front end. The rating should be grouped by email, and also searchable via the use of an email.
So that is what we are attempting to cover in this post. However since this is the 1st time we have had to use Kafka, this post will also talk through what you have to go through to get Kafka setup on windows. In a subsequent post I will attempt to get EVERYTHING up and working (including the Play front end) in Docker containers, but for now we will assume a local install of Kafka, if nothing else its good to know how to set this up
How to install Kafka/Zookeeper and get them running on Windows
This section will talk you through how to get Kafka and get it working on Windows
Step 1 : Download Kafka
Grab Confluence Platform 3.3.0 Open Source : http://packages.confluent.io/archive/3.3/confluent-oss-3.3.0-2.11.zip
Step 2 : Update Dodgy BAT Files
The official Kafka windows BAT files don’t seem to work in the Confluence Platform 3.3.0 Open Source download. So replace the official [YOUR INSTALL PATH]\confluent-3.3.0\bin\windows BAT files with the ones found here : https://github.com/renukaradhya/confluentplatform/tree/master/bin/windows
Step 3 : Make Some Minor Changes To Log Locations etc etc
Kafka/Zookeeper as installed are setup for Linux, as such these paths won’t work on Windows. So we need to adjust that a bit. So lets do that now
- Modify the [YOUR INSTALL PATH]\confluent-3.3.0\etc\kafka\zookeeper.properties file to change the dataDir to something like dataDir=c:/temp/zookeeper
- Modify the [YOUR INSTALL PATH]\confluent-3.3.0\etc\kafka\server.properties file to uncomment the line delete.topic.enable=true
Step 4 : Running Zookeeper + Kafka + Creating Topics
Now that we have installed everything, it’s just a matter of running stuff up. Sadly before we can run Kafka we need to run Zookeeper, and before Kafka can send messages we need to ensure that the Kafka topics are created. Topics must exist before messages
Mmm that sounds like a fair bit of work. Well it is, so I decided to script this into a little PowerShell script. This script is available within the source code at https://github.com/sachabarber/MadCapIdea/tree/master/PowerShellProject/PowerShellProject where the script is called RunPipeline.ps1
So all we need to do is change to the directory that the RunPipeline.ps1 is in, and run it.
Obviously it is setup to my installation folders, you WILL have to change the variables at the top of the script if you want to run this on your own machine
Here is the contents of the RunPipeline.ps1 file
$global:mongoDbInstallationFolder = "C:\Program Files\MongoDB\Server\3.5\bin\"
$global:kafkaWindowsBatFolder = "C:\Apache\confluent-3.3.0\bin\windows\"
$global:kafkaTopics =
"rating-submit-topic",
"rating-output-topic"
$global:ProcessesToKill = @()
function RunPipeLine()
{
WriteHeader "STOPPING PREVIOUS SERVICES"
StopZookeeper
StopKafka
Start-Sleep -s 20
WriteHeader "STARTING NEW SERVICE INSTANCES"
StartZookeeper
StartKafka
Start-Sleep -s 20
CreateKafkaTopics
RunMongo
WaitForKeyPress
WriteHeader "KILLING PROCESSES CREATED BY SCRIPT"
KillProcesses
}
function WriteHeader($text)
{
Write-Host "========================================`r`n"
Write-Host "$text`r`n"
Write-Host "========================================`r`n"
}
function StopZookeeper() {
$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-stop.bat"
Write-Host "> Zookeeper Command Line : $zookeeperCommandLine`r`n"
$global:ProcessesToKill += start-process $zookeeperCommandLine -WindowStyle Normal -PassThru
}
function StopKafka() {
$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-stop.bat"
Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine`r`n"
$global:ProcessesToKill += start-process $kafkaServerCommandLine -WindowStyle Normal -PassThru
}
function StartZookeeper() {
$zookeeperCommandLine = $global:kafkaWindowsBatFolder + "zookeeper-server-start.bat"
$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\zookeeper.properties"
Write-Host "> Zookeeper Command Line : $zookeeperCommandLine args: $arguments `r`n"
$global:ProcessesToKill += start-process $zookeeperCommandLine $arguments -WindowStyle Normal -PassThru
}
function StartKafka() {
$kafkaServerCommandLine = $global:kafkaWindowsBatFolder + "kafka-server-start.bat"
$arguments = $global:kafkaWindowsBatFolder + "..\..\etc\kafka\server.properties"
Write-Host "> Kafka Server Command Line : $kafkaServerCommandLine args: $arguments `r`n"
$global:ProcessesToKill += start-process $kafkaServerCommandLine $arguments -WindowStyle Normal -PassThru
}
function CreateKafkaTopics()
{
Foreach ($topic in $global:kafkaTopics )
{
$kafkaCommandLine = $global:kafkaWindowsBatFolder + "kafka-topics.bat"
$arguments = "--zookeeper localhost:2181 --create --replication-factor 1 --partitions 1 --topic $topic"
Write-Host "> Create Kafka Topic Command Line : $kafkaCommandLine args: $arguments `r`n"
$global:ProcessesToKill += start-process $kafkaCommandLine $arguments -WindowStyle Normal -PassThru
}
}
function RunMongo() {
$mongoexe = $global:mongoDbInstallationFolder + "mongod.exe"
Write-Host "> Mongo Command Line : $mongoexe `r`n"
$global:ProcessesToKill += Start-Process -FilePath $mongoexe -WindowStyle Normal -PassThru
}
function WaitForKeyPress
{
Write-Host -NoNewLine "Press any key to continue....`r`n"
[Console]::ReadKey()
}
function KillProcesses()
{
Foreach ($processToKill in $global:ProcessesToKill )
{
$name = $processToKill | Get-ChildItem -Name
Write-Host "Killing Process : $name `r`n"
$processToKill | Stop-Process -Force
}
}
# Kick of the entire pipeline
RunPipeLine
As you can see this script does a bit more than just run up Zookeeper and Kafka, it also create the topics and runs Mongo DB that is also required by the main Play application (remember we are using Reactive Mongo for the login/registration side of things)
So far I have not had many issues with the script. Though occasionally when you are trying out new code, I do tend to clear out all the Zookeeper/Kafka state so far, which for me is stored here
It just allows me to start with a clean slate as it were, you should need to do this that often
Walking through a KafkaProducer
So the Kafka Producer I present here will send a String key and a JSON Ranking object as a payload. Lets have a quick look at the Ranking object and how it gets turned to and from JSON before we look at the producer code
The Domain Entities
Here is what the domain entities looks like, it can be seen that these use the Spray formatters (part of Akka Http) Marshaller/UnMarshaller JSON support. This is not required by the producer but is required by the REST API, which we will look at later. The producer and Kafka streams code work with a different serialization abstraction, something known as SERDES, which as far as I know is only used as a term in Kafka. Its quite simple it stands for Serializer-Deserializer (Serdes)
Anyway here are the domain entities and Spray formatters that allow Akka (but not Kafka Streams, more on this later) to work
package Entities
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
case class Ranking(fromEmail: String, toEmail: String, score: Float)
case class HostStoreInfo(host: String, port: Int, storeNames: List[String])
object AkkaHttpEntitiesJsonFormats {
implicit val RankingFormat = jsonFormat3(Ranking)
implicit val HostStoreInfoFormat = jsonFormat3(HostStoreInfo)
}
Serdes
So as we just described Kafka Streams actually cares not for the standard Akka Http/Spray JSON formatters, its not part of Kafka Streams after all. However Kafka Streams still has some of the same concerns where it needs to serialize and de-serialize data (like when it re-partitions (i.e. shuffles data)), so how does it realize that. Well it uses a weirdly name thing called a “serde”. There are MANY inbuilt “serde” types, but you can of course create your own to represent your “on the wire format”. I am using JSON, so this is what my generic “Serde” implementation looks like. I should point out that I owe a great many things to a chap called Jendrik whom I made contact with who has been doing some great stuff with Kafka, his blog has really helped me out : https://www.madewithtea.com/category/kafka-streams.html
Anyway here is my/his/yours/ours “Serde” code for JSON
import java.lang.reflect.{ParameterizedType, Type}
import java.util
import com.fasterxml.jackson.annotation.JsonInclude
import com.fasterxml.jackson.core.JsonParseException
import com.fasterxml.jackson.core.`type`.TypeReference
import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.exc.{UnrecognizedPropertyException => UPE}
import com.fasterxml.jackson.module.scala.DefaultScalaModule
import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer}
package Serialization {
object Json {
type ParseException = JsonParseException
type UnrecognizedPropertyException = UPE
private val mapper = new ObjectMapper()
mapper.registerModule(DefaultScalaModule)
mapper.setSerializationInclusion(JsonInclude.Include.NON_NULL)
private def typeReference[T: Manifest] = new TypeReference[T] {
override def getType = typeFromManifest(manifest[T])
}
private def typeFromManifest(m: Manifest[_]): Type = {
if (m.typeArguments.isEmpty) {
m.runtimeClass
}
else new ParameterizedType {
def getRawType = m.runtimeClass
def getActualTypeArguments = m.typeArguments.map(typeFromManifest).toArray
def getOwnerType = null
}
}
object ByteArray {
def encode(value: Any): Array[Byte] = mapper.writeValueAsBytes(value)
def decode[T: Manifest](value: Array[Byte]): T =
mapper.readValue(value, typeReference[T])
}
}
class JSONSerializer[T] extends Serializer[T] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def serialize(topic: String, data: T): Array[Byte] =
Json.ByteArray.encode(data)
override def close(): Unit = ()
}
class JSONDeserializer[T >: Null <: Any : Manifest] extends Deserializer[T] {
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def deserialize(topic: String, data: Array[Byte]): T = {
if (data == null) {
return null
} else {
Json.ByteArray.decode[T](data)
}
}
}
class JSONSerde[T >: Null <: Any : Manifest] extends Serde[T] {
override def deserializer(): Deserializer[T] = new JSONDeserializer[T]
override def configure(configs: util.Map[String, _], isKey: Boolean): Unit = ()
override def close(): Unit = ()
override def serializer(): Serializer[T] = new JSONSerializer[T]
}
}
And finally here is what the producer code looks like
As you can see it is actually fairly straight forward, its simple produces 10 messages (though you can uncomment the line in the code to make it endless) of String as a key and a Ranking as the message. The key would be who the ranking is destined for. These will then be aggregated by the streams processing node, and stored as a list against a Key (ie email)
package Processing.Ratings {
import java.util.concurrent.TimeUnit
import Entities.Ranking
import Serialization.JSONSerde
import Topics.RatingsTopics
import scala.util.Random
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.common.serialization.Serdes
import Utils.Settings
import org.apache.kafka.clients.producer.ProducerConfig
object RatingsProducerApp extends App {
run()
private def run(): Unit = {
val jSONSerde = new JSONSerde[Ranking]
val random = new Random
val producerProps = Settings.createBasicProducerProperties
val rankingList = List(
Ranking("jarden@here.com","sacha@here.com", 1.5f),
Ranking("miro@here.com","mary@here.com", 1.5f),
Ranking("anne@here.com","margeret@here.com", 3.5f),
Ranking("frank@here.com","bert@here.com", 2.5f),
Ranking("morgan@here.com","ruth@here.com", 1.5f))
producerProps.put(ProducerConfig.ACKS_CONFIG, "all")
System.out.println("Connecting to Kafka cluster via bootstrap servers " +
s"${producerProps.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG)}")
val rankingProducer = new KafkaProducer[String, Array[Byte]](
producerProps, Serdes.String.serializer, Serdes.ByteArray.serializer)
for (i <- 0 to 10) {
val ranking = rankingList(random.nextInt(rankingList.size))
val rankingBytes = jSONSerde.serializer().serialize("", ranking)
System.out.println(s"Writing ranking ${ranking} to input topic ${RatingsTopics.RATING_SUBMIT_TOPIC}")
rankingProducer.send(new ProducerRecord[String, Array[Byte]](
RatingsTopics.RATING_SUBMIT_TOPIC, ranking.toEmail, rankingBytes))
Thread.sleep(100)
}
Runtime.getRuntime.addShutdownHook(new Thread(() => {
rankingProducer.close(10, TimeUnit.SECONDS)
}))
}
}
}
Walking through a Kafka Streams processing node, and the duality of streams
Before we get started I just wanted to include a severak excerpts taken from the official Kafka docs : http://docs.confluent.io/current/streams/concepts.html#duality-of-streams-and-tables which talks about KStream and KTable objects (which are the stream and table objects inside Kafka streams)
When implementing stream processing use cases in practice, you typically need both streams and also databases. An example use case that is very common in practice is an e-commerce application that enriches an incoming stream of customer transactions with the latest customer information from a database table. In other words, streams are everywhere, but databases are everywhere, too.
Any stream processing technology must therefore provide first-class support for streams and tables. Kafka’s Streams API provides such functionality through its core abstractions for streams and tables, which we will talk about in a minute. Now, an interesting observation is that there is actually a close relationship between streams and tables, the so-called stream-table duality. And Kafka exploits this duality in many ways: for example, to make your applications elastic, to support fault-tolerant stateful processing, or to run interactive queries against your application’s latest processing results. And, beyond its internal usage, the Kafka Streams API also allows developers to exploit this duality in their own applications.
A simple form of a table is a collection of key-value pairs, also called a map or associative array. Such a table may look as follows:
The stream-table duality describes the close relationship between streams and tables.
- Stream as Table: A stream can be considered a changelog of a table, where each data record in the stream captures a state change of the table. A stream is thus a table in disguise, and it can be easily turned into a “real” table by replaying the changelog from beginning to end to reconstruct the table. Similarly, aggregating data records in a stream will return a table. For example, we could compute the total number of pageviews by user from an input stream of pageview events, and the result would be a table, with the table key being the user and the value being the corresponding pageview count.
- Table as Stream: A table can be considered a snapshot, at a point in time, of the latest value for each key in a stream (a stream’s data records are key-value pairs). A table is thus a stream in disguise, and it can be easily turned into a “real” stream by iterating over each key-value entry in the table.
Let’s illustrate this with an example. Imagine a table that tracks the total number of pageviews by user (first column of diagram below). Over time, whenever a new pageview event is processed, the state of the table is updated accordingly. Here, the state changes between different points in time – and different revisions of the table – can be represented as a changelog stream (second column).
Because of the stream-table duality, the same stream can be used to reconstruct the original table (third column):
The same mechanism is used, for example, to replicate databases via change data capture (CDC) and, within Kafka Streams, to replicate its so-called state stores across machines for fault tolerance. The stream-table duality is such an important concept for stream processing applications in practice that Kafka Streams models it explicitly via the KStream and KTable abstractions, which we describe in the next sections.
I would STRONLY urge you all to read the section of the official docs above, as it will really help you should you want to get into Kafka Streams.
Anyway with all that in mind how does that relate to the use case we are trying to solve. So far we have a publisher that pushes out Rating objects, and as stated ideally we would like to query these across all processor nodes. As such we should now know that this will involve a KStream and some sort of aggregation to an eventual KTable (where a state store will be used).
Probably the easiest thing to do is to start with the code, which looks like this for the main stream processing code for the Rating section of then final app.
import java.util.concurrent.TimeUnit
import org.apache.kafka.common.serialization._
import org.apache.kafka.streams._
import org.apache.kafka.streams.kstream._
import Entities.Ranking
import Serialization.JSONSerde
import Topics.RatingsTopics
import Utils.Settings
import Stores.StateStores
import org.apache.kafka.streams.state.HostInfo
import scala.concurrent.ExecutionContext
package Processing.Ratings {
class RankingByEmailInitializer extends Initializer[List[Ranking]] {
override def apply(): List[Ranking] = List[Ranking]()
}
class RankingByEmailAggregator extends Aggregator[String, Ranking,List[Ranking]] {
override def apply(aggKey: String, value: Ranking, aggregate: List[Ranking]) = {
value :: aggregate
}
}
object RatingStreamProcessingApp extends App {
implicit val ec = ExecutionContext.global
run()
private def run() : Unit = {
val stringSerde = Serdes.String
val rankingSerde = new JSONSerde[Ranking]
val listRankingSerde = new JSONSerde[List[Ranking]]
val builder: KStreamBuilder = new KStreamBuilder
val rankings = builder.stream(stringSerde, rankingSerde, RatingsTopics.RATING_SUBMIT_TOPIC)
val rankingTable = rankings.groupByKey(stringSerde,rankingSerde)
.aggregate(
new RankingByEmailInitializer(),
new RankingByEmailAggregator(),
listRankingSerde,
StateStores.RANKINGS_BY_EMAIL_STORE
)
rankingTable.toStream.print()
val streams: KafkaStreams = new KafkaStreams(builder, Settings.createRatingStreamsProperties)
val restEndpoint:HostInfo = new HostInfo(Settings.restApiDefaultHostName, Settings.restApiDefaultPort)
System.out.println(s"Connecting to Kafka cluster via bootstrap servers ${Settings.bootStrapServers}")
System.out.println(s"REST endpoint at http://${restEndpoint.host}:${restEndpoint.port}")
streams.cleanUp();
streams.start()
val restService = new RatingRestService(streams, restEndpoint)
restService.start()
Runtime.getRuntime.addShutdownHook(new Thread(() => {
streams.close(10, TimeUnit.SECONDS)
restService.stop
}))
()
}
}
}
Remember the idea is to get a Rating for a user (based on their email address), and store all the Rating associated with them in some sequence/list such that they can be retrieved in one go based on a a key, where the key would be the users email, and the value would be this list of Rating objects.I think with the formal discussion from the official Kafka docs and my actual Rating requirement, the above should hopefully be pretty clear.
Walking through Kafka Streams interactive queries
So now that we have gone through how data is produced, and transformed (well actually I did not do too much transformation other than a simple map, but trust me you can), and how we aggregate results from a KStream to a KTable (and its state store), we will move on to see how we can use Kafka interactive queries to query the state stores.
One important concept is that if you used multiple partitions for your original topic, the state may be spread across n-many processing node. For this project I have only chosen to use 1 partition, but have written the code to support n-many.
So lets assume that each node could read a different segment of data, or that each node must read from n-many partitions (there is not actually a mapping to nodes and partitions these are 2 mut read chapters elastic-scaling-of-your-application and parallelism-model) we would need each node to expose a REST API to allow its OWN state store to be read. By reading ALL the state stores we are able to get a total view of ALL the persisted data across ALL the partitions. I urge all of you to read this section of the official docs : http://docs.confluent.io/current/streams/developer-guide.html#querying-remote-state-stores-for-the-entire-application
This diagram has also be shamelessly stolen from the official docs:
I think this diagram does an excellent job of showing you 3 separate processor nodes, and each of them may have a bit of state. ONLY be assembling ALL the data from these nodes are we able to see the ENTIRE dataset.
Kafka allows this via metadata about the streams, where we can use the exposed metadata to help us gather the state store data. To do this we first need a MetadataService
, which for me is as follows:
package Processing.Ratings
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.StreamsMetadata
import java.util.stream.Collectors
import Entities.HostStoreInfo
import org.apache.kafka.common.serialization.Serializer
import org.apache.kafka.connect.errors.NotFoundException
import scala.collection.JavaConverters._
class MetadataService(val streams: KafkaStreams) {
def streamsMetadata() : List[HostStoreInfo] = {
val metadata = streams.allMetadata
return mapInstancesToHostStoreInfo(metadata)
}
def streamsMetadataForStore(store: String) : List[HostStoreInfo] = {
val metadata = streams.allMetadataForStore(store)
return mapInstancesToHostStoreInfo(metadata)
}
def streamsMetadataForStoreAndKey[T](store: String, key: T, serializer: Serializer[T]) : HostStoreInfo = {
val metadata = streams.metadataForKey(store, key, serializer)
if (metadata == null)
throw new NotFoundException(
s"No metadata could be found for store : ${store}, and key type : ${key.getClass.getName}")
HostStoreInfo(metadata.host, metadata.port, metadata.stateStoreNames.asScala.toList)
}
def mapInstancesToHostStoreInfo(metadatas : java.util.Collection[StreamsMetadata]) : List[HostStoreInfo] = {
metadatas.stream.map[HostStoreInfo](metadata =>
HostStoreInfo(
metadata.host(),
metadata.port,
metadata.stateStoreNames.asScala.toList))
.collect(Collectors.toList())
.asScala.toList
}
}
This metadata service is used to obtain the state store information, which we can then use to extract the state data we want (it’s a key value store really).
The next thing we need to do is expose a REST API to allow us to get the state. lets see that now
package Processing.Ratings
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.state.HostInfo
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.stream.ActorMaterializer
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json.DefaultJsonProtocol._
import Entities.AkkaHttpEntitiesJsonFormats._
import Entities._
import Stores.StateStores
import akka.http.scaladsl.marshalling.ToResponseMarshallable
import org.apache.kafka.common.serialization.Serdes
import scala.concurrent.{Await, ExecutionContext, Future}
import akka.http.scaladsl.unmarshalling.Unmarshal
import spray.json._
import scala.util.{Failure, Success}
import org.apache.kafka.streams.state.QueryableStoreTypes
import scala.concurrent.duration._
object RestService {
val DEFAULT_REST_ENDPOINT_HOSTNAME = "localhost"
}
class RatingRestService(val streams: KafkaStreams, val hostInfo: HostInfo) {
val metadataService = new MetadataService(streams)
var bindingFuture: Future[Http.ServerBinding] = null
implicit val system = ActorSystem("rating-system")
implicit val materializer = ActorMaterializer()
implicit val executionContext = system.dispatcher
def start() : Unit = {
val emailRegexPattern = """\w+""".r
val storeNameRegexPattern = """\w+""".r
val route =
path("test") {
get {
parameters('email.as[String]) { (email) =>
complete(HttpEntity(ContentTypes.`text/html(UTF-8)`,
s"<h1>${email}</h1>"))
}
}
} ~
path("ratingByEmail") {
get {
parameters('email.as[String]) { (email) =>
try {
val host = metadataService.streamsMetadataForStoreAndKey[String](
StateStores.RANKINGS_BY_EMAIL_STORE,
email,
Serdes.String().serializer()
)
var future:Future[List[Ranking]] = null
if(!thisHost(host))
future = fetchRemoteRatingByEmail(host, email)
else
future = fetchLocalRatingByEmail(email)
val rankings = Await.result(future, 20 seconds)
complete(rankings)
}
catch {
case (ex: Exception) => {
val finalList:List[Ranking] = scala.collection.immutable.List[Ranking]()
complete(finalList)
}
}
}
}
} ~
path("instances") {
get {
complete(ToResponseMarshallable.apply(metadataService.streamsMetadata))
}
}~
path("instances" / storeNameRegexPattern) { storeName =>
get {
complete(ToResponseMarshallable.apply(metadataService.streamsMetadataForStore(storeName)))
}
}
bindingFuture = Http().bindAndHandle(route, hostInfo.host, hostInfo.port)
println(s"Server online at http://${hostInfo.host}:${hostInfo.port}/\n")
Runtime.getRuntime.addShutdownHook(new Thread(() => {
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => system.terminate())
}))
}
def fetchRemoteRatingByEmail(host:HostStoreInfo, email: String) : Future[List[Ranking]] = {
val requestPath = s"http://${hostInfo.host}:${hostInfo.port}/ratingByEmail?email=${email}"
println(s"Client attempting to fetch from online at ${requestPath}")
val responseFuture: Future[List[Ranking]] = {
Http().singleRequest(HttpRequest(uri = requestPath))
.flatMap(response => Unmarshal(response.entity).to[List[Ranking]])
}
responseFuture
}
def fetchLocalRatingByEmail(email: String) : Future[List[Ranking]] = {
val ec = ExecutionContext.global
val host = metadataService.streamsMetadataForStoreAndKey[String](
StateStores.RANKINGS_BY_EMAIL_STORE,
email,
Serdes.String().serializer()
)
val f = StateStores.waitUntilStoreIsQueryable(
StateStores.RANKINGS_BY_EMAIL_STORE,
QueryableStoreTypes.keyValueStore[String,List[Ranking]](),
streams
).map(_.get(email))(ec)
val mapped = f.map(ranking => {
if (ranking == null)
List[Ranking]()
else
ranking
})
mapped
}
def stop() : Unit = {
bindingFuture
.flatMap(_.unbind())
.onComplete(_ => system.terminate())
}
def thisHost(hostStoreInfo: HostStoreInfo) : Boolean = {
hostStoreInfo.host.equals(hostInfo.host()) &&
hostStoreInfo.port == hostInfo.port
}
}
With that final class we are able to run the application and query it using the url http://localhost:8080/ratingByEmail?email=sacha@here.com (the key to the Kafka store here is “sacha@here.com” and the value could either be an empty list or a List[Ranking] objects as JSON, the results of which are shown below after we have run the producer and used Chrome (or any other REST tool of your picking) to get the results
Conclusion
I have found the journey to get here an interesting one. The main issue being that the Kafka docs and example are all written in Java and some are not even using Java Lambdas (Java 1.8) so the translation from that to Scala code (where there is lambda everywhere) is sometimes trickier than you might think.
The other thing that has caught me out a few times is that the Scala type system is pretty good at inferring the correct types, so you kind of let it get on with its job. But occasionally it doesn’t/can’t infer the type correctly, this may happen at compile time if you are lucky, or at run time. In the case of a runtime issue, I found it fairly hard to see exactly which part of the Kafka streams API would need to be told a bit more type information.
As a general rule of thumb, if there is an overloaded method that takes a serde, and one that doesn’t ALWAYS use the one that takes a serde and specify the generic type parameters explicitly. The methods that take Serdes are usually ones that involve some sort of shuffling around within partitions so need Serdes to serialize and deserialize correctly.
Other than that I am VERY happy with working with Kafka streams, and once you get into it, its not that different from working with Apache Spark and RDDs
Next time
Next time we will be turning our attention back to the web site, where we will expose an endpoint that can be called from the ratings dialog that is launched at the end of a job. This endpoint will take the place of the RatingsProducerApp demonstrated in this app. For that we will be using https://github.com/akka/reactive-kafka. We would also expose a new end point to fetch the rating (via email address) fro the Kafka stream processor node
The idea being that when a job is completed a Rating from a driver to passenger is given, this is sent to the Kafka stream processor node, and the combined rating are accumulated for users (by email address) and are exposed to be queried. As you can see this post covers the later part of this requirement already. The only thing we would need to do (as stated above) is replace the RatingsProducerApp demonstrated in this app with new reactive kafka producer in the main Play application