Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / web / Node.js

Simple Service-Based System for Massive Data Processing with ZooKeeper-Kafka and Redis

5.00/5 (16 votes)
15 Feb 2018CPOL14 min read 22.4K   323  
Basic infrastructure for massive continuous data acquisition and processing using ZooKeeper-Kafka and Redis
This article discusses simple infrastructure for data acquisition and processing which allows developers to build a range of software from a single-process application to complex distributed multi-service system.

Introduction

This article presents compact software providing basic infrastructure for massive continuous data acquisition and processing. The system is based on well-known and wide spread in industry products. Kafka with Zookeeper is responsible for data streaming, and Redis acts as in-memory data storage. This products stack below is referred to as ZKR. These are short excerpts from Wikipedia descriptions of ZKR stack elements.

  • Apache ZooKeeper is a software project of the Apache Software Foundation. It is essentially a distributed hierarchical key-value store, which is used to provide a distributed configuration service, synchronization service, and naming registry for large distributed systems.

  • Apache Kafka is an open-source stream processing platform developed by the Apache Software Foundation written in Scala and Java. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Its storage layer is essentially a "massively scalable pub/sub message queue architected as a distributed transaction log. ...ZooKeeper is used by Kafka for coordination among consumers.

  • Redis is an open-source in-memory database project implementing a distributed, in-memory key-value store with optional durability. Redis supports different kinds of abstract data structures, such as strings, lists, maps, sets, sorted sets, hyperloglogs, bitmaps and spatial indexes. The name Redis means REmote DIctionary Server.

Although the code sample for the article employs ZKR stack, demonstrated approach is rather generic. With usage of uniform interfaces, these products may be replaced relatively easily. Overall system structure is depicted in Figure 1.

Image 1

Figure 1. Overall System Structure.

Data Sources (DS) periodically produce metrics and text data to be stored and processed. DS write the data to Data Streaming component (Zookeeper / Kafka in our case). Collection of Data Processing Services is responsible for reading data from Data Streaming, storing them to Data Storage (Redis is our case), and processing the data. Results of the processing are also written to the Data Storage. API for UI and Customer's Applications constitutes additional layer of services reading data from the Data Storage. It provides viewing capabilities for the system. Such a layout is very flexible and convenient. Usage of a standard well performant Data Streaming component spares developer from writing a lot of tedious routine communication code. The third party in-memory Data Storage supports fast read / write operations for the services. Data Streaming and Data Storage provide access to the data with Web API and dedicated DLL adapters (the latters are used in the article sample). The Data Processing Services run indepedently and exchange data via Data Storage. This layout allows users to have a collection of relatively simple services that may be developed in  different program languages.

Data Model

The main purpose of this system is efficient data acquisition, storage and processing. For the sake of simplicity, data are travelling from DS via Data Streaming to Data Storage packed in a uniform object defined as class DSData in assembly ObjectsLib. This object consists of a timestamp, array of floats (an integer indicating length of the array is not a part of DSData but inserted before the array itself on serialization), and a string. Each DS periodically generates such an object. It is serialized and written to Data Streaming, then read from there by Data Processing Service. The latter may deserialize data object for its pre-processing if required, and then write it to Data Storage. The Service may decide not to write data object to Data Store (perhaps the object is too old or invalid), or to store processed data only. In our system, the Service performs a very simple check whether data is not too old and if not, then stores them as they are.

Each DSData object is placed to Data Streaming with its identification key. In our case, this is integer serial number of DS generated the object. As it was said above, our simple system stores data to Data Storage as they read from Data Streaming without any reshafling. Example of data structure stored in Data Storage is shown in Figure 2. It is shown that some floats may be omitted. String may contain data and clarifications regarding metric data (e.g., in our example that values 0 and 3 were actually written).

Image 2<

Figure 2. Data Object

As you can see, data stored with the same key are placed in inverse chronological order with certain history depth (if required). Structure of stored data should not necessarily copy structure of received object. Usually by one reading operation from Data Storage, data of a single key may be obtained. But data storage layout should be optimized according to subsequent reading.

Design

The system is designed bearing in mind the following objectives:

  • Provide uniform infrastructure appropriate for small single-process solution with in-process Data Storage and larger multi-services systems alike.
  • Possibility for easy replacement of specific Data Streaming and/or Data Storage provider.
  • Convenient starting point for adding horizontal scaling (scale-out) for several machines.
  • Developing open system with the ability to add / remove services seamlessly.

The solution is described in the table below.

Table 1. Description of Folders and Projects in the Solution

Folder Description
Tests Two tests applications.
FullFlowTest allows developer to test full data flow Data Simulation --> Writing to Data Streaming --> Reading from Data Streaming --> Writing to Data Storage --> Reading from Data Storage in the single application in debug friendly environment.
RedisEventTest tests publishing and subscribing to Redis events.
Applications The folder contains DS simulator DataSourcesSimulator, services for data reading from Data Streaming to Data Storage DataReceiver, for data processing DataProcessor and presentation Dashboard, NodeJsDataProcessor. All these applications are C# console applications, but the latter written in Node.js. _RunMe application starts the entire work process.
Common Basic common assemblies referenced to across the entire solution.
Serializers Assemblies providing standard serialization and deserialization procedures.
DataStorage General data access layer hiding from its user details of specific Data Storage.
Factories General factories for generation of concrete Data Streaming and Data Storage according to uniform interfaces.
Kafka Concrete implementation of Data Streaming with Kafka data producer and consumer. Confluent.Kafka library (version 0.11.0.0) is used.
Redis Concrete implementation of Data Storage with Redis. StackExchange.Redis (version 1.2.6.0) is used.

The system provides the following data flow:

  • Producing data to Data Streaming
  • Consuming data from Data Streaming
  • Writing data to Data Storage
  • Reading data from Data Storage, and
  • Data processing

The activities related to Data Streaming and Storage are carried out with types implementing certain interfaces defined in CommonInterfacesLib namespace.

Interface for streaming producer has a single method to send array of (string key -> T value) pairs to Data Streaming component:

C#
public interface IStreamingProducer<T> : IDisposable
{
    void SendDataPairs(params KeyValuePair<string, T>[] arr);
}

This can be done either synchronously or asynchronously depending on concrete implementation. In our system, the interface is implemented by type KafkaProducer<T> in namespace KafkaProducerLib. Its constructor gets arguments for connection to Data Streaming, topic and partition and error handling delegate. The constructor launches periodic dequeueing KeyValuePair<string, T> pairs and writing them to Data Streaming using ThreadProcessing type. Method SendDataPairs() enqueues the pairs and triggers writing process.

Interface for streaming consumer also has a single method to start asynchronous data consuming:

C#
public interface IStreamingConsumer : IDisposable
{
    void StartConsuming();
}

Type KafkaConsumer<T> implements the interface.

Mechanism of writing to and reading from Data Storage is defined with single interface, IDataStore<T> : IDisposable.

C#
public interface IDataStore<T> : IDisposable
{
    // Add value to a key and trim historical data up to given trim length 
    void Add(string key, T t, int trimLength = -1);

    // Get values for the keys within given historical limits
    Dictionary<string, IList<T>> 
       Get(ICollection<string> keys, long start = 0, long stop = -1);

    // Get values for a single key within given historical limits
    IList<T> Get(string key, long start = 0, long stop = -1);
	
    // Trim historical data for the keys up to given trim length
    void Trim(ICollection<string> keys, int length);

    // Delete all values for the keys
    void Delete(ICollection<string> keys);

    // Publish events asynchronously
    Task PublishAsync(string channelName, string message);
	
    // Subscribe for event
    void Subscribe(string channelPattern, Action<string> callback);

    // Get average duration of the above operations for diagnostic purposes
    Dictionary<string, TimeSpan> OperationsAverageDuration { get; }
}

The above interface supports adding data to Data Storage, trimming historical data, getting data by either single key or a collection of keys. It also allows publishing and subscribing to Data Storage events - a useful feature provided by Redis. Type RedisAdapter<T> implements the interface.

As you can see, the interfaces for Data Streaming and Storage are very basic and not demanding. Here, their concrete implementations dealing with specific products Kafka and Redis respectively. Factory types StreamingProducer, StreamingConsumer and DataStore decouple general interfaces from their concrete implementations. It means that different implementations of the interfaces may be instantiated with appropriate factory type.

It is convenient to begin acquaintance with the system with FullFlowTest application. The application demonstrates all phases of data travelling from DS to data processing in debug-friendly environment. Static method StreamingProducer.Create<DSData>() of StreamingProducerFactoryLib.StreamingProducer type creates a DSData producer object streamingDataProducer to simulate data generation by DS. Its method SendDataPairs() actually asynchronously generates array KeyValuePair<string, DSData>[] as key -> value pairs (in this test case just a single pair). Then class StorageWriter<T> is instantiated producing object dataStorageWriter. This object is responsible for insertion of data to Data Storage. Appropriate periodic process is started by its constructor. The constructor receives all necessary parameters as its arguments, including address and port of the storage, period of writing, data filter and error handling methods. Type parameter T defines format of stored data and in the sample may be either byte[] or string. The latter is easier to read by various kinds of readers (particularly Node.js reader). Data Storage writer StorageWriter<T> works as following. Its constructor starts periodic process that dequeues data from internal queue and writes them to Data Storages in a different thread. Data Streaming consumer (described below) enqueues to the queue data read from Data Streaming. The Data Streaming consumer object streamingDataConsumer is created by static method StreamingConsumer.Create<byte[]>() that provides message processor as an argument, and its method StartConsuming() launches reading from Data Streaming. The message processor is our sample simply enqueues messages into Data Storage writer queue, but may be assigned with some filtering and pre-processing tasks.

Code Sample

Prerequisites

The first step is installation of the Zookeeper, Kafka and Redis. Simple but yet comprehensive guidance for installation of Zookeeper and Kafka on Windows may be found e.g., here (Zookeeper and Kafka). Redis as either msi of zip file may be downloaded from here (Redis). Version Redis-x64-3.2.100 is used.

Elements of ZKR stack are installed as Windows services but may be run as console applications. For simplicity in our code sample, we will run them as console applications using appropriate command files 1 - Start_Zookeeper.cmd, 2 - Start_Kafka.cmd and Start_Redis.cmd (numbers in files name indicates order in which they start).

Note for running Redis. By default, Redis runs in so called protected mode permitting only local calls to it. To allow network call Redis server should start not in protected mode:

redis-server.exe --protected-mode no

This is how Redis starts from our command file.

NodeJsDataProcessor project in developed in Node.js. So if you want to run it, then Node.js should be installed, e.g., following this guidance. It would also be useful to install Node.js Development Tools for Visual Studio from here.

Some more handy tips that I found empirically playing with the sample. Update of Java in your computer may cause change of environment variable JAVA_HOME used by ZKR stack elements, and they may cease to operate. Zookeeper should be started before Kafka. Before every start of Kafka (at least as console application), folders containing Kafka logs and Zookeeper data (in my installation kafka_2.11-0.11.0.1kafka-logs and kafka_2.11-0.11.0.1zookeeper-data) should be deleted form Kafka installation folder (in my case C:\kafka_2.11-0.11.0.1).
Note. When you will run sample applications as it is described below, you need not worry about these complications since this operation is performed by the samples themselves.

After successful installation of the ZKR stack elements, appropriate referenced assemblies dealing with ZKR have to be installed. This may be carried out using NuGet. Required packages are included to packages.config files of appropriate projects. These are the following components:

  • for Kafka: Confluent.Kafka.0.11.0 and librdkafka.redist.0.11.0
  • for Redis: StackExchange.Redis.1.2.6

Running Software

After having installed required packages and built the solution, you may run the applications. It is suggested to begin with test application, FullFlowTest. The application first started all elements of ZKR stack as three console applications (you will notice three additional console windows appearing in the screen), and then perform all steps of the data flow chain:

Data Simulation --> Writing to Data Streaming --> Reading from Data Streaming --> Writing to Data Storage --> Reading from Data Storage

The above operations, but the last one, are performed once. Reading from Data Storage is performed periodically. So the application reads and displays as delimited string the same DSData object. Starting with the second, the string object is preceded with "?" indicating that the data are old (indeed, the same object with fixed timestamp is read). To quit the application, press any key and kill all ZKR windows.

You may proceed with the next small test application, RedisEventTest. It demonstrates usage of Redis events. These events are a very handy inter-process synchronization tool. The application first starts Redis and then invites user to publish event. Result appears on the screen.

Finally, the main data flow may be tested by starting _RunMe application from Applications folder. It starts all ZKR applications, then DataProcessor, two DataReceivers, Dashboard, NodeJsDataProcessor and finally DataSourcesSimulator. _RunMe configures all started applications (but DataSourcesSimulator which is configured by itself) via Data Storage (Redis): it writes configuration data to the Storage that applications aware of, and the applications read them. According to default configuration, DataSourcesSimulator generates and writes to Data Stream (Kafka) 1000 DSData objects with array of 4 float values every second. These data are evenly distributed between two DataReceivers which read them from Data Stream and write to Data Storage. The first DataReceiver stores data as bytes arrays and the second one as strings. DataProcessor, Dashboard and NodeJsDataProcessor read data from Data Storage. Dashboard visualizes data from one DS (in our case, this sine function is simulated by DataSourcesSimulator).

Image 3

Figure 3. Data Visualization with Dashboard Application.

Very simple (few lines of code) application NodeJsDataProcessor written in Node.js, illustrates the possibility to use services developed in different languages by various vendors to extend the system.

Some Measurements and Discussion

While running the sample, some time measurements were performed. Dell laptop (CPU i7-7600U, RAM 16 GB, 2.90 GHz; Windows 10 Enterprise) was used for local tests. An older laptop CPU i7-3610QM, RAM 16 GB, 2.30 GHz; Windows 10 Pro) ran Redis in remote tests. Relatively slow home WiFi connection was used. It is interesting to measure average duration of such operations with one record as writing to and reading from Redis. Some results are presented in the following table.

Table 2. Average Duration of Writing and Reading Operation with Redis.

Operation Write byte[]
mcs
Write string
mcs
Read byte[]
mcs
Read string
mcs
Local 140 140 120 320

Writing to Redis was tested in synchronous mode (CommandFlags.None in RedisAdapter<T> class). Direct remote reading and writing take much longer (milliseconds). Massive remote reading and synchronous writing are not possible with weak network (although asynchronous writing with CommandFlags.FireAndForget for this example was successful).

These rough measurements provide some clue for design of system layout. When Redis is installed as local application, services running in different machines can asynchronously write data to it, whereas local reading is preferred. In case of extensive reading, it is probably better to store data as array of bytes, but the possibility of reading these data with different readers in different programming languages should be kept in mind. It is a good idea to preprocess data before writing to Data Storage when possible. Wise grouping of stored data in order to optimize their subsequent reading should not be underestimated. And if remote data reading is absolutely necessary, then probably a better choice is to first read the data locally and then transfer them to client machine by other communication means. Redis cluster should be checked for remote operations.

So it is clear that performance of Data Storage, particularly for remote reading, should be improved for real world applications. Such enhancements may imply, e.g., usage of Redis pipelining technique allowing to send multiple commands at once. Usage of Redis cluster may be considered to mitigate remote call problems. redis-benchmark utility may be used for more accurate measurements. It is quite possible that skilled Redis expert will immediately suggest how to improve performance.

Conclusions

Simple infrastructure for data acquisition and processing presented in the article, allows developers to build range of software from a single-process application to complex distributed multi-service system. The sample in the article deals with Zookeeper - Kafka - Redis stack. But these products may be replaced with some others without change of basic infrastructure (e.g., RabbitMQ may be considered for Data Streaming, and Apache Ignite is a promising candidate for Data Storage). Moreover, in simple case of a single-process system, Data Streaming and/or Data Storage may be developed "in house" using same common interfaces. The next step can be usage of distributed version of Data Streaming and Data Storage. For example, it will be very interesting to use distributed cluster installation of Redis. The sample presented in the article can be easily moved to .NET Core. Possibility to use the same 3rd party libraries or finding other libraries compatible with .NET Core are to be checked. Cloud installation of the system will be interesting to perform.

History

  • 15th February, 2018: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)