Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / Languages / Scala

Simple Kafka Producer and Consumer with C# and Scala

5.00/5 (5 votes)
28 Oct 2019CPOL5 min read 24.2K   1.6K  
Simple Kafka producer and consumer written in C# and Scala with Avro schema read either from local file or simple Web server

Introduction

This article presents a simple Apache Kafka producer / consumer application written in C# and Scala. The applications are interoperable with similar functionality and structure. They operate the same data in Kafka.

Prerequisites

To run code, Zookeeper and Kafka should be installed. Appropriate guidance for local Windows installation may be found here. Zookeeper and Kafka are installed as Windows services, but may be also run as console applications. For simplicity in our code sample, we will run them as console applications using appropriate command files:

  • 1 - Start_Zookeeper.cmd
  • 2 - Start_Kafka.cmd

Leading numbers in file names indicate order in which they start - Zookeeper should be started first. I installed Zookeeper version 3.5.5 to directory C:\zookeeper-3.5.5 and Kafka version 2.12-2.3.0 to directory C:\kafka_2.12-2.3.0. These directories are used by the start cmd files. So if you have different directories, then appropriate changes in the cmd files should be made. Folders containing Kafka logs and Zookeeper data in my installation are kafka_2.12-2.3.0kafka-logs and kafka_2.12-2.3.0zookeeper-data under C:\kafka_2.12-2.3.0. These directories may be deleted before next run of Zookeeper-Kafka in order to remove previous data.

Note: Update of Java in your computer may cause change of environment variable JAVA_HOME used by Zookeeper and Kafka, and they may cease to operate.

Design

Dedicated classes produce and consume messages which in our case constitute string Key -> Value pairs. The Value is organized as an object built according to Avro schema in JSON format. This is standard and recommended approach that ensures interoperability for different Kafka producer / consumers. E.g., in our case, objects of the same type are produced and consumed with C# and Scala applications. Normally, Avro schema is available from Kafka Schema Registry (detailed information about it may be found, e.g., in excellent article by Sacha Barber). This server should be available to all Kafka providers and consumers of required object. Kafka Schema Registry serves its clients with REST API. Particularly, an HTTP POST request of certain format results with transferring Avro schema to a client. Appropriate classes of Confluent.Kafka library communicate with Kafka Schema Registry under the hood without any additional code. Kafka Schema Registry should be installed and maintained.

Using uniform Avro object schema across different Kafka clients is always beneficial. But in some relatively simple cases, a standard full blown Kafka Schema Registry is an overkill, and schema may be available from more simple server or simply from a file. Code of this article provides reading Avro schema either with a simple HTTP GET request or from a local JSON file.

Wrappers around Confluent.Kafka producer and consumer are provided. Consumer wrapper allows Kafka client to subscribe for messages and process them with a given callback. Producer wrapper offers method to send messages to Kafka. Constructors of both wrappers read Avro schema in a customized way (from either some Web server or from file).

In both C# and Scala environments, application is organized in a similar way. First, data for Avro schema access and Kafka configuration are provided (hardcoded for simplicity). Then consumer is created and its asynchronous method to start messages consuming is called. This method takes a message processing callback as its parameter. After this preparation, producer is created and its method to send messages is periodically called. As it was said above, messages are placed in Kafka as Key -> Value pairs. Key is of type string and Value is of standard Avro type GenericRecord.

Access Schema

For both C# and Scala solutions, Avro schema file schema.json is placed in directory wwwroot under the solution's root directory. It is accessed either directly with file system (default) or with a simple Web Server. To test the latter way, I used IISExpress in my Windows system. A site to access the schema file should be added to IISExpress configuration file C:\Users\Igorl\Documents\IISExpress\config\applicationhost.config. Appropriate fragment is placed in file SiteForIISExpressConfig.txt. Content of this file should be copied to chapter of IISExpress configuration file. Then IISExpress should be started for this site. This can be done running file startIIS.cmd placed in main directory of C# and Scala solutions.

Note: In both C# and Scala solutions to switch to schema access with IISExpress, please change value of hardcoded boolean constant isFromLocalFile to false (appropriate line is marked in code with comment //1).

C#

Project KafkaHelperLib provides wrappers for Confluent.Kafka producer and consumer. This project is developed with .NET Standard and therefore may be referred to from both .NET Core and Framework projects. The project contains classes KafkaConsumer and KafkaProducer. They serialize / deserialize a well-known GenericRecord Avro object as Value for a Kafka data pair. Class RecordConfig takes schema access string schemaRegistryUrl as its constructor parameter. Its public method GetSchemaRegistryClient() returns object of ISchemaRegistryClient interface. Usually class Confluent.SchemaRegistry.CachedSchemaRegistryClient : ISchemaRegistryClient, ... is used as implementation of ISchemaRegistryClient interface. This class accesses Kafka Schema Registry. In our simplified case, we created "stub" class SchemaRegistryClient as implementation of ISchemaRegistryClient interface. Constructors of both KafkaConsumer and KafkaProducer classes use class RecordConfig to create Avro serialized and deserializer objects respectively.

Project HttpClient is also built as .NET Standard one available for .NET Core and Framework. It is taken from some old solution only in order to implement HTTP GET call to receive Avro schema from custom Web Server.

Test application KafkaApp is a .NET Core application. It puts together Kafka producer and consumer. The application provides a callback for consumed objects. This callback simply prints content of an object to console. Producer sends objects, created according to Avro schema, to Kafka.

Scala

Scala solution is generally similar to a C# one, but here the GenericRecord object is serialized to / deserialized from Array[Byte] in order to use standard ByteArraySerializer and ByteArrayDeserializer classes. Scala application also prints consumed Kafka pairs to its console. To distinguish between objects produced by C# and Scala, the latters are created with negative Id field.

Conclusions

The article presents simple code for Kafka producer and consumer written in C# and Scala. Object created with Avro schema are produced and consumed. Avro schema is read either from either local file or simple Web server.

History

  • 26th October, 2019: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)