For my job at the moment, I am roughly spending 50% of my time working on .NET and the other 50% of the time working with Scala. As such a lot of Scala/JVM toys have spiked my interest of late. My latest quest was to try and learn Apache Kafka, well enough that I at least understood the core concepts. I have even read a book or two on Apache Kafka, now, so feel I am at least talking partial sense in this article.
So what is Apache Kafka, exactly?
Here is what the Apache Kafka folks have to say about their own tool.
Apache Kafka is publish-subscribe messaging rethought as a distributed commit log.
Fast
A single Kafka broker can handle hundreds of megabytes of reads and writes per second from thousands of clients.
Scalable
Kafka is designed to allow a single cluster to serve as the central data backbone for a large organization. It can be elastically and transparently expanded without downtime. Data streams are partitioned and spread over a cluster of machines to allow data streams larger than the capability of any single machine and to allow clusters of co-ordinated consumers
Durable
Messages are persisted on disk and replicated within the cluster to prevent data loss. Each broker can handle terabytes of messages without performance impact.
Distributed by Design
Kafka has a modern cluster-centric design that offers strong durability and fault-tolerance guarantees.
Taken from http://kafka.apache.org/ up on date 11/03/16
Apache Kafka was designed and built by a team of engineers at LinkedIn, where I am sure you will agree they probably had to deal with quite a bit of data.
In this article I will talk you through some of the core Apache Kafka concepts, and will also show how to create a Scala Apache Kafka Producer and a Scala Apache Kafka Consumer. I will also sprinkle some RxScala pixie dust on top of the Apache Kafka Consumer code such that the RX operators to be applied to the incoming Apache Kafka messages.
I have actually tried to emulate a popular .NET article I wrote a while back using SignalR and RX wired up to a simple WPF user interface. Where it makes sense in the sections below I will call out to this .NET article as it may be of interest to some of you.
I have put the code up on my GitHub account. There are actually 2 branches for this Git repo.
- There is a branch where I got the basic plumbing working, just simple Producer/Consumer. Where Producer sends JSON to Consumer. You can grab that code from this branch
- There is another branch which I fleshed things out a lot more, and introduced a more structured/re-usable/generic Consumer API. As I state above, I also introduced RxScala into the mix, such that Rx could piggy back the Kafka consumer(s) incoming messages. You can grab the more fleshed out version (that this article is based on) from this branch
This section will talk about core Apache Kafka concepts that will help you understand the code a bit better.
As already stated Apache Kafka is a state of the art distributed / fault tolerant messaging system. It also has some other things up its sleeve, such as the ability to do sharding, pub and event sourcing.
Lets talk a bit about the simple operations.
There is a producer that produces data, and it will would typically connect to a Apache Kafka broker (more than likely in a cluster) the producer would pump out data on what is known as a "Topic". A topic is some identification that both producer and consumer agree upon. The producer will produce messages on a topic, and a consumer can be set up to read from a topic. In its most basic form, that is it. However lets talk about some of the nice stuff mentioned above
Partitioning
So Apache Kafka has the concept of partitions for a given topic. What this means is that you can come up with some way of splitting the messages across multiple partitions. Say you logged messages by IP Address, you might decide that all messages that start with 127.192 would go to one partition and all others would go somewhere else. Apache Kafka allows this. You would have to write some code for your specific user case but its possible.
Pub Sub vs Queues
Messaging traditionally has two models: queuing and publish-subscribe. In a queue, a pool of consumers may read from a server and each message goes to one of them; in publish-subscribe the message is broadcast to all consumers. Apache Kafka offers a single consumer abstraction that generalizes both of these—the consumer group.
Consumers label themselves with a consumer group name, and each message published to a topic is delivered to one consumer instance within each subscribing consumer group. Consumer instances can be in separate processes or on separate machines.
If all the consumer instances have the same consumer group, then this works just like a traditional queue balancing load over the consumers.
If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.
Perhaps the following diagram will illustrate this a bit better
Event Sourcing
Now it may not be that obvious that a distributed clustered fault tolerant messaging system can be used for event sourcing. However Apache Kafka has the idea of a commit log (per partition) that is able to store message (on disk) for a fixed period of time. Consumer can choose to commit to the log, which moves their index into it, but they may also move back to a certain time and consumer all messages from the log from that index.
The index is committed by the consumer but is maintained in Apache Zookeeper which is a crazed bit of software which is used to co-ordinate distributed applications. I may do another post on ZooKeeper, but that's another story. For now just know that ZooKeeper is the one that maintains the index (for each partition).
The producers job is to publish messages to a topic and that is about all the consumer really has to worry about. As stated above it the consumer which decides which consumer group they will use, which in turn dictates which consumer(s) get to see which messages.
Consumers are expected to provide several bits of information. Namely:
- Apache Kafka broker connection details
- Topic details
- Consumer group details
It is by supplying these details that the messages are correctly received in the consumer(s).
As mentioned above Apache Kafka has the concept of a commit log. It is in fact slightly more complex, as each partition has a commit log. The partitioning strategy is something that your application code must define. However for every partition for a topic there is a separate commit log/offset. This offset it adjusted by the consumer, where the actual value is stored in Zookeeper (Apache Kafka takes care of that part).
It should be noted that even though you can make multithreaded consumers, only one thread may access the offset for a partition, so even if you had multiple threads running doing the same thing, only one of them would be able to adjust the commit log.
As stated above there is a concept of a "consumer group" which the consumers use. If more than one consumer share a consumer group then the message will go to one of the consumers. If all the consumer instances have different consumer groups, then this works like publish-subscribe and all messages are broadcast to all consumers.
Apache Kafka uses another Apache project internally, namely Apache Zookeeper. Apache Zookeeper is used to store the consumer offsets. It is outside the scope of this article to get you all familiar with Apache Zookeeper, but in a nutshell Apache Zookeeper is used to provide distributed application synchronization.
This section will talk you through a simple Scala based demo app, which does the following:
- Has a simple Scala producer that pumps out JSON serialized messages to a topic/consumer group
- Has a simple Scala consumer that is capable of read from many topics
- Has a generic abstraction for a general purpose Apache Kafka client, that also exposes the incoming messages as a RxScala Observable[T] stream
- Has a generic repository abstraction. Such that a single repository may be used for monitoring a particular Apache Kafka topic
If you were running things in a production environment this section would not apply. If however you are like me, and like to try things in isolation in the safety of your own box (where my own box is a Windows 10 one), you should read on.
Essentially for production you would have clusters of Apache Kafka and more than like a Zookeeper cluster too, which would all run on Linux servers.
However for trying stuff out this is a bit cumbersome. So lets stick with trying to get Apache Kafka / Zookeeper working as a single local instance on a Windows machine. How do we do that?
The best place to start is actually following another codeproject tutorial :
www.codeproject.com/Articles/1068998/Running-Apache-Kafka-on-Windows-without-Cygwin
If you follow that guide all the way to the end you will have a pretty good understanding of what you need to do in order to get Apache Kafka / Zookeeper up and running.
There are a number of tasks you will need to do every time you restart your box. Essentially you MUST do these 2 steps before you can expect the Producer/Consumer code attached to this article to work for you.
Associated with the github code is a file called "Kafka.txt" which contains some useful notes.
You will need to ensure you run these 2 command lines (as Administrator) each time you wish to run the Producer/Consumer code attached to this article.
NOTE : You may have to change the paths depending on where you installed things
1. Running Zookeeper
cd C:\Apache\Apache-zookeeper-3.4.6\bin
zkserver
2. Running Kafka
cd C:\Apache\Apache-Kafka-0.9.0.0
.\bin\windows\kafka-server-start.bat .\config\server.properties
You will also need to create the topics for the attached Producer/Consumer code. You will only have to do this one, and you must also ensure both Kafka/Zookeeper are running, see 2 steps above.
Once you know they are both running we can simply use the following command line
cd C:\Apache\Apache-Kafka-0.9.0.0\bin\windows
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic fast-messages
kafka-topics.bat --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic order-placed-messages
Where it can be seen that we setup certain things for the topic, such as which ZooKeeper broker to use, replication/partition count. All good stuff I am sure you will agree.
The producer is the easier component to explain, and essentially consists of a couple of classes and a config file. We will now go through each of them in turn
The producer needs certain properties to be filled in order to run. For this we just use Google Guava library, which allows us to read config values easily.
Here is the properties file for the producer
bootstrap.servers=localhost:9092
acks=all
retries=0
batch.size=16384
auto.commit.interval.ms=1000
linger.ms=0
key.serializer=org.apache.kafka.common.serialization.StringSerializer
value.serializer=org.apache.kafka.common.serialization.StringSerializer
block.on.buffer.full=true
The messages themselves are simple case classes that are serialized across the wire using the Play JSON library. Here are the messages that are included with the Producer.
package Messages
import play.api.libs.json.Json
case class FastMessage(name: String, number: Int)
object FastMessageJsonImplicits {
implicit val fastMessageFmt = Json.format[FastMessage]
implicit val fastMessageWrites = Json.writes[FastMessage]
implicit val fastMessageReads = Json.reads[FastMessage]
}
package Messages
import play.api.libs.json.Json
case class OrderPlacedMessage(timeOfMessage: java.util.Date)
object OrderPlacedMessageJsonImplicits {
implicit val orderPlacedMessageFmt = Json.format[OrderPlacedMessage]
implicit val orderPlacedMessageWrites = Json.writes[OrderPlacedMessage]
implicit val orderPlacedMessageReads = Json.reads[OrderPlacedMessage]
}
Not much more to say there, these objects are pretty much just property bags to be honest.
The producer needs to produce the messages. How the producer produces messages is up to your business needs. For me I simply wanted a loop that would spit out new messages every so often. To do this I have a new Runnable
(threading abstraction) that will run the message producing logic (for a given message type) every x-time. As there are 2 messages that the demo code supports there are 2 such Runnable
classes. We will look at this in one second.
For now here is the producers code, where it can be seen that this class pretty much just hands of the execution of the 2 message runners to the standard Executors.newSingleThreadScheduledExecutor
JVM executor.
import org.apache.kafka.clients.producer.KafkaProducer
import java.util.Properties
import com.google.common.io.Resources
import java.util.concurrent.Executors
import java.util.concurrent.TimeUnit
object ScalaProducer {
def main(args: Array[String]): Unit = {
val scalaProducer = new ScalaProducer()
scalaProducer.run(args)
}
}
class ScalaProducer {
def run(args: Array[String]) : Unit = {
println("Press enter to start producer")
scala.io.StdIn.readLine()
var producer : KafkaProducer[String, String] = null
var closeableKafkaProducer : CloseableKafkaProducer = null
try {
val props = Resources.getResource("producer.props").openStream()
val properties = new Properties()
properties.load(props)
producer = new KafkaProducer[String,String](properties)
closeableKafkaProducer = new CloseableKafkaProducer(producer)
val fastMessageRunnable =
new FastMessageRunnable("fast-messages",closeableKafkaProducer)
val fastMessageRunnerScheduler =
Executors.newSingleThreadScheduledExecutor()
fastMessageRunnerScheduler.scheduleAtFixedRate(fastMessageRunnable, 0, 3, TimeUnit.SECONDS);
val orderPlacedMessageRunnable =
new OrderPlacedMessageRunnable("order-placed-messages",closeableKafkaProducer)
val orderPlacedMessageScheduler =
Executors.newSingleThreadScheduledExecutor()
orderPlacedMessageScheduler.scheduleAtFixedRate(orderPlacedMessageRunnable, 0, 1, TimeUnit.SECONDS);
println("producing messages")
scala.io.StdIn.readLine()
}
catch {
case throwable : Throwable =>
val st = throwable.getStackTrace()
println(s"Got exception : $st")
}
finally {
if(closeableKafkaProducer != null) {
closeableKafkaProducer.closeProducer()
}
}
}
}
As stated above I took the decision to put the producing of each message in its own Runnable
, which would be shceduled to produce a new message of the required type every x-time. So what do these Runnable
classes look like. Well they start with a simple base class that does the actual publishing of the message using the KafkaProducer[String,String]
class, which I wrap like this just so I can close it safely at any time:
import org.apache.kafka.clients.producer.KafkaProducer
class CloseableKafkaProducer(val producer : KafkaProducer[String, String]) {
var isClosed : Boolean = false
def closeProducer() : Unit = {
if(!isClosed) {
producer.close()
}
}
}
The actual Runnable
code looks like this, where we simple use the KafkaProducer[String,String]
send(..)
method.
import org.apache.kafka.clients.producer.ProducerRecord
abstract class MessageRunnable extends Runnable {
val topic : String
val closeableKafkaProducer : CloseableKafkaProducer
override def run() : Unit = {
try {
val message = produceMessage()
println("running > " + message)
closeableKafkaProducer.producer.send(new ProducerRecord[String, String](topic, message))
closeableKafkaProducer.producer.flush()
println(s"Sent message on topic '$topic'")
}
catch {
case throwable: Throwable => {
val errMessage = throwable.getMessage
println(s"Got exception : $errMessage")
closeableKafkaProducer.closeProducer()
}
}
}
def produceMessage() : String
}
Inheritors of this class are used to create the correct JSON format to send across the wire, as shown below for the "fast-messages" topic where we make use of the FastMessage
scala case class, which is sent as a JSON payload.
import java.util.Calendar
import play.api.libs.json.Json
import Messages.{FastMessage}
import Messages.FastMessageJsonImplicits._
class FastMessageRunnable(
val topic : String,
val closeableKafkaProducer : CloseableKafkaProducer)
extends MessageRunnable {
override def produceMessage(): String = {
Json.toJson(FastMessage("FastMessage_" +
Calendar.getInstance().getTime().toString(),1)).toString()
}
}
NOTE : For the consumer I only setup the "fast-messages" topic code, but you would just follow how that works for the "order-placed-messages" topic. Should be identical setup
The consumer is a bit more complicated than the producer, but this is only the case due to the fact that I wanted to carry out certain things, such as :
- I wanted to use Rx
- I wanted to come up with a generic approach, such that it would act more like a repository, that you would just use if you wanted to listen to a certain message type. Essentially the messages would be exposed as an
Observable[T]
such that you could use all the fancy Rx (for scala) goodies.
I will walk through each of the composite parts that make this happen.
The consumer needs certain properties to be filled in order to run. For this we just use Google Guava library, which allows us to read config values easily.
Here is the properties file for the producer
bootstrap.servers=localhost:9092
group.id=test
enable.auto.commit=true
key.deserializer=org.apache.kafka.common.serialization.StringDeserializer
value.deserializer=org.apache.kafka.common.serialization.StringDeserializer
# fast session timeout makes it more fun to play with failover
session.timeout.ms=10000
# These buffer sizes seem to be needed to avoid consumer switching to
# a mode where it processes one bufferful every 5 seconds with multiple
# timeouts along the way. No idea why this happens.
fetch.min.bytes=50000
receive.buffer.bytes=262144
max.partition.fetch.bytes=2097152
As I stated above I wanted to be able to use Rx and create re-usable bit of the architecture. As such I put my thinking cap on, and decided what I needed was a generic Kafka consumer, and a specialization of that, that simply dealt with the correct type of JSON message deserialization (like we did for the producer). To this end there is one common base class (there will only ever be this one, see GenericKafkaConsumer
) and one consumer class (see FastMessageKafkaConsumer
) per message type.
Lets start with the common Kafka consumer base class, where the following are the main points of the code
- That this generic class takes a topic, and will only deal with the message if the incoming record topic matches the requested topic
- There is a message reading loop which is making use of the
KafkaConsumer[string, string]
to read the messages of the wire
- That there is a abstract method
readTopicJson(..)
which is expected to be implemented by inheritors of this class
- That there is an RxScala Subject being used to expose the incoming messages as an
Observable[T]
GenericKafkaConsumer
import java.io.Closeable
import java.util.concurrent.{TimeUnit, Executors, ExecutorService}
import Messages.FastMessage
import com.google.common.io.Resources
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.consumer.ConsumerRecords
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Arrays
import java.util.Properties
import java.util.Random
import play.api.libs.json.{Reads, Json}
import rx.lang.scala.Observable
import rx.lang.scala.subjects.PublishSubject
abstract class GenericKafkaConsumer[T](topic : String) extends Closeable with Runnable {
val topicSubject = PublishSubject.apply[T]()
var consumer : KafkaConsumer[String, String] = null
val pool : ExecutorService = Executors.newFixedThreadPool(1)
var shouldRun : Boolean = true
def startConsuming() : Unit = {
pool.execute(this)
}
def run() : Unit = {
try {
val props = Resources.getResource("consumer.props").openStream()
val properties = new Properties()
properties.load(props)
if (properties.getProperty("group.id") == null) {
properties.setProperty("group.id", "group-" + new Random().nextInt(100000))
}
consumer = new KafkaConsumer[String, String](properties)
consumer.subscribe(Arrays.asList(topic))
var timeouts = 0
println(s"THE TOPIC IS : $topic")
while (shouldRun) {
println("consumer loop running, wait for messages")
val records : ConsumerRecords[String, String] = consumer.poll(200)
val recordCount = records.count()
if (recordCount == 0) {
timeouts = timeouts + 1
} else {
println(s"Got $recordCount records after $timeouts timeouts\n")
timeouts = 0
}
val it = records.iterator()
while(it.hasNext()) {
val record : ConsumerRecord[String,String] = it.next()
val recordTopic = record.topic()
if(recordTopic == topic) {
val message = readTopicJson(record,topic)
message.map(x => {
println(s"Message about to be RX published is $x")
topicSubject.onNext(x)
consumer.commitSync()
})
}
else {
println(s"Unknown message seen for topic '$recordTopic' .....crazy stuff")
}
}
}
}
catch {
case throwable : Throwable =>
shouldRun = false
val st = throwable.getStackTrace()
println(s"Got exception : $st")
topicSubject.onError(throwable)
}
finally {
shutdownAndAwaitTermination(pool)
}
}
protected def readJsonResponse[T](
record: ConsumerRecord[String,String],
topicDescription : String)(implicit reader: Reads[T]) : Option[T] = {
try {
println(s"$topicDescription >")
Some(Json.parse(record.value()).as[T])
}
catch {
case throwable: Throwable =>
val st = throwable.getStackTrace()
println(s"Got exception : $st")
None
}
}
def getMessageStream() : Observable[T] = {
topicSubject.asInstanceOf[Observable[T]]
}
override def close() : Unit = {
println(s"GneericKafkaConsumer closed")
shouldRun = false
shutdownAndAwaitTermination(pool)
}
def shutdownAndAwaitTermination(pool : ExecutorService) : Unit = {
pool.shutdown()
try {
if (!pool.awaitTermination(60, TimeUnit.SECONDS)) {
pool.shutdownNow()
if (!pool.awaitTermination(60, TimeUnit.SECONDS))
println("Pool did not terminate")
}
}
catch {
case throwable : Throwable =>
val st = throwable.getStackTrace()
println(s"Got exception : $st")
pool.shutdownNow()
Thread.currentThread().interrupt()
}
}
def readTopicJson(record : ConsumerRecord[String,String], topic : String) : Option[T]
}
So now that we have gone through what this generic base class does, lets look at the specialization.
FastMessageKafkaClient
IMPORTANT NOTE : You will need one of these per message type, and you will need to a mapping to the MessageClient
class too.
So we have a nice base class that does most of the work for us. The only thing it doesn't do is the deserialization. For that I decided to use a specialization of the base class that would do this work. Here is an example of that for the FastMessage
. As with the producer side, I make use of the Play JSON library.
import Messages.FastMessage
import Messages.FastMessageJsonImplicits._
import org.apache.kafka.clients.consumer.ConsumerRecord
class FastMessageKafkaConsumer
extends GenericKafkaConsumer[FastMessage](Consumers.fastMessageTopic) {
def pushOneOut(m : FastMessage) : Unit = {
topicSubject.onNext(m)
}
override def readTopicJson(
record : ConsumerRecord[String,String],
topic : String) : Option[FastMessage] = {
readJsonResponse[FastMessage](record,topic)
}
}
So far we have a generic Kafka consumer that exposes a RxScala Subject as an Observable[T]
, which gets its OnNext
called whenever an imcoming message is read that matches the topic of the consumer. We have also seen that there is specializations of this generic base class that provide the correct JSON deserialization.
So what else did we want to do?
Well if you recall I said I wanted to achieve the following:
- I wanted to use Rx
- I wanted to come up with a generic approach, such that it would act more like a repository, that you would just use if you wanted to listen to a certain message type. Essentially the messages would be exposed as an
Observable[T]
such that you could use all the fancy Rx (for scala) goodies.
So how do we do that. Well put quite simply we want to create a better, more reliable, retryable Observable[T]
stream. In the demo code this is the job of the MessageClient class that looks like this:
import rx.lang.scala.subscriptions.CompositeSubscription
import rx.lang.scala.{Subscription, Observable}
class MessageClient() {
val consumerMap = setupMap()
def setupMap() : Map[String, (() => GenericKafkaConsumer[AnyRef])] = {
val map = Map[String, () => GenericKafkaConsumer[AnyRef]]()
val updatedMap = map + (Consumers.fastMessageTopic, () =>
{
new FastMessageKafkaConsumer().asInstanceOf[GenericKafkaConsumer[AnyRef]]}
)
updatedMap
}
def getMessageStreamForTopic[T](topic : String) : Observable[T] = {
Observable.create[T](observer => {
consumerMap.get(topic) match {
case Some(messageFactory) => {
try {
val streamSource = messageFactory().asInstanceOf[GenericKafkaConsumer[T]]
streamSource.startConsuming()
val sub = streamSource.getMessageStream().subscribe(observer)
CompositeSubscription(sub, Subscription(streamSource.close()))
}
catch {
case throwable : Throwable =>
val st = throwable.getStackTrace()
println(s"Got exception : $st")
Subscription()
}
}
case _ => {
println("OH NO THATS BAD")
observer.onCompleted()
Subscription()
}
}
}).publish.refCount
}
}
Essentially what is going on here, is that use Observable.Create(..)
to create an Observable[T]
for the topic that is being requested. We also ensure that we use publish
to ensure that the stream is shared (for late subscribers) and we use refCount
to use the automatic disposal when there are no active subscriptions left.
Now that we have a nice MessageClient
class that does most of the work for us, we just need to make use of it within the generic repository code. One could argue that the MessageClient
and the code below could be squished together into one class, but it just felt a better separation of concerns to me to have separate classes.
The main points below are
- That we new up the
MessageClient
to create the mappings, and deal with the stream creation (where it uses the previously discussed consumer classes).
- That we use several Rx operators that will repeat the stream on failure, will also publish it to share the stream, and will use the auto dispose semantics of
refCount
- We use
Observable.defer(..)
to act as a factory that only returns the Observable[T]
when its called the first time
import rx.lang.scala.Observable
object GenericRepository {
private lazy val messageClient = new MessageClient()
def GetMessageStream[T](topic: String): Observable[T] = {
Observable.defer[T](messageClient.getMessageStreamForTopic[T](topic))
.repeat
.publish
.refCount
}
}
With all this in place the final usage ends up looking like this, which I am sure you will agree is pretty simple.
import Messages.FastMessage
object ScalaConsumer {
def main(args: Array[String]): Unit = {
var subs = GenericRepository.GetMessageStream[FastMessage](Consumers.fastMessageTopic)
.subscribe(x => {
println(s"RX SUBJECT stuff working, got this FAST MESSAGE : $x")
})
}
}
The Rx parts of this article are very similar to techniques I have used in the .NET world, so I am pretty ok with their usage. If you are interested in the .NET articles here are the links:
So we have covered a lot of ground, so how about a nice screen shot to prove that all this stuff actually works.
The screen shot below shows the producer and a consumer (remember the consumer provided within the demo code at GitHub only consumes the "fast-messages" topic, which is why you NEVER see any consumer output for the "order-placed-messages"
CLICK FOR BIGGER IMAGE
And that is all I have to say this time. I hope you have enjoyed it. If you have a comment/vote is always well received so don't be shy