Introduction
This article presents a simple Apache Kafka producer / consumer application written in C# and Scala. The applications are interoperable with similar functionality and structure. They operate the same data in Kafka.
Prerequisites
To run code, Zookeeper and Kafka should be installed. Appropriate guidance for local Windows installation may be found here. Zookeeper and Kafka are installed as Windows services, but may be also run as console applications. For simplicity in our code sample, we will run them as console applications using appropriate command files:
- 1 - Start_Zookeeper.cmd
- 2 - Start_Kafka.cmd
Leading numbers in file names indicate order in which they start - Zookeeper should be started first. I installed Zookeeper version 3.5.5 to directory C:\zookeeper-3.5.5 and Kafka version 2.12-2.3.0 to directory C:\kafka_2.12-2.3.0. These directories are used by the start cmd files. So if you have different directories, then appropriate changes in the cmd files should be made. Folders containing Kafka logs and Zookeeper data in my installation are kafka_2.12-2.3.0kafka-logs and kafka_2.12-2.3.0zookeeper-data under C:\kafka_2.12-2.3.0. These directories may be deleted before next run of Zookeeper-Kafka in order to remove previous data.
Note: Update of Java in your computer may cause change of environment variable JAVA_HOME
used by Zookeeper and Kafka, and they may cease to operate.
Design
Dedicated classes produce and consume messages which in our case constitute string Key -> Value pairs. The Value is organized as an object built according to Avro schema in JSON format. This is standard and recommended approach that ensures interoperability for different Kafka producer / consumers. E.g., in our case, objects of the same type are produced and consumed with C# and Scala applications. Normally, Avro schema is available from Kafka Schema Registry (detailed information about it may be found, e.g., in excellent article by Sacha Barber). This server should be available to all Kafka providers and consumers of required object. Kafka Schema Registry serves its clients with REST API. Particularly, an HTTP POST request of certain format results with transferring Avro schema to a client. Appropriate classes of Confluent.Kafka
library communicate with Kafka Schema Registry under the hood without any additional code. Kafka Schema Registry should be installed and maintained.
Using uniform Avro object schema across different Kafka clients is always beneficial. But in some relatively simple cases, a standard full blown Kafka Schema Registry is an overkill, and schema may be available from more simple server or simply from a file. Code of this article provides reading Avro schema either with a simple HTTP GET
request or from a local JSON file.
Wrappers around Confluent.Kafka
producer and consumer are provided. Consumer wrapper allows Kafka client to subscribe for messages and process them with a given callback. Producer wrapper offers method to send messages to Kafka. Constructors of both wrappers read Avro schema in a customized way (from either some Web server or from file).
In both C# and Scala environments, application is organized in a similar way. First, data for Avro schema access and Kafka configuration are provided (hardcoded for simplicity). Then consumer is created and its asynchronous method to start messages consuming is called. This method takes a message processing callback as its parameter. After this preparation, producer is created and its method to send messages is periodically called. As it was said above, messages are placed in Kafka as Key -> Value pairs. Key is of type string
and Value is of standard Avro type GenericRecord
.
Access Schema
For both C# and Scala solutions, Avro schema file schema.json is placed in directory wwwroot under the solution's root directory. It is accessed either directly with file system (default) or with a simple Web Server. To test the latter way, I used IISExpress in my Windows system. A site to access the schema file should be added to IISExpress configuration file C:\Users\Igorl\Documents\IISExpress\config\applicationhost.config. Appropriate fragment is placed in file SiteForIISExpressConfig.txt. Content of this file should be copied to chapter of IISExpress configuration file. Then IISExpress should be started for this site. This can be done running file startIIS.cmd placed in main directory of C# and Scala solutions.
Note: In both C# and Scala solutions to switch to schema access with IISExpress, please change value of hardcoded boolean constant isFromLocalFile
to false
(appropriate line is marked in code with comment //1
).
C#
Project KafkaHelperLib
provides wrappers for Confluent.Kafka
producer and consumer. This project is developed with .NET Standard and therefore may be referred to from both .NET Core and Framework projects. The project contains classes KafkaConsumer
and KafkaProducer
. They serialize / deserialize a well-known GenericRecord
Avro object as Value for a Kafka data pair. Class RecordConfig
takes schema access string schemaRegistryUrl
as its constructor parameter. Its public
method GetSchemaRegistryClient()
returns object of ISchemaRegistryClient
interface. Usually class Confluent.SchemaRegistry.CachedSchemaRegistryClient : ISchemaRegistryClient, ...
is used as implementation of ISchemaRegistryClient
interface. This class accesses Kafka Schema Registry. In our simplified case, we created "stub" class SchemaRegistryClient
as implementation of ISchemaRegistryClient
interface. Constructors of both KafkaConsumer
and KafkaProducer
classes use class RecordConfig
to create Avro serialized and deserializer objects respectively.
Project HttpClient
is also built as .NET Standard one available for .NET Core and Framework. It is taken from some old solution only in order to implement HTTP GET
call to receive Avro schema from custom Web Server.
Test application KafkaApp
is a .NET Core application. It puts together Kafka producer and consumer. The application provides a callback for consumed objects. This callback simply prints content of an object to console. Producer sends objects, created according to Avro schema, to Kafka.
Scala
Scala solution is generally similar to a C# one, but here the GenericRecord
object is serialized to / deserialized from Array[Byte]
in order to use standard ByteArraySerializer
and ByteArrayDeserializer
classes. Scala application also prints consumed Kafka pairs to its console. To distinguish between objects produced by C# and Scala, the latters are created with negative Id
field.
Conclusions
The article presents simple code for Kafka producer and consumer written in C# and Scala. Object created with Avro schema are produced and consumed. Avro schema is read either from either local file or simple Web server.
History
- 26th October, 2019: Initial version