Here we walk through the process of connecting a streaming data source to the Kafka cluster set up in a previous article. We pick an open data set to work with, and the create a simple Python application to stream it into our HDInsight Kafka cluster.
The first article in this series explored the simple setup process for a Kafka cluster on Azure HDInsight.
This article explores the process of creating and deploying the Kafka producer, which will simulate a stream of data from factory sensors in a Python producer script. The simulated stream also allows for adjusting and resubmitting as needed.
The Producer
The first step in creating the application is to define the machine and sensor information. After this, the Kafka broker list and topic name will be configured. The broker names on Azure are accessible using a Secure Shell Protocol (SSH) connection to the cluster. See the previous article for this cluster’s SSH credentials.
Alternatively, the topic can be created using an SSH connection with PowerShell. While this demonstration will only overview the relevant steps, Microsoft provides a complete tutorial on using PowerShell for Azure.
To use this method, it’s necessary to first install the Az PowerShell module, which is already on the cloud shell accessible from the portal. If you’re using SSH from a local machine, ensure that PowerShell and the Az module have both been installed.
First, connect to the primary head node of the cluster and replace CLUSTERNAME
with created Kafka cluster name. Additionally, replace sshuser
if the default was altered during the setup process:
ssh sshuser@CLUSTERNAME-ssh.azurehdinsight.net
If it’s not yet present, install the JSON processor (jq) on the primary head node. This may require the installation of any Python libraries that the application needs:
sudo apt -y install jq
Next, set the Kafka cluster name
environment variable:
read -p "Enter the Kafka on HDInsight cluster name: " CLUSTERNAME
Then, set the ZOOKEEPER
environment variable for the host:
export KAFKAZKHOSTS=`curl -sS -u admin -G https:
Now, enter the admin password created in the cluster deployment process.
Next, set the KAFKA_BROKER
environment variable for the host:
export KAFKA BROKERS=`curl -sS -u admin -G https:
Then, get the broker list.
echo '$KAFKABROKERS='$KAFKABROKERS
This should return something similar to the following:
<brokername1>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092,<brokername2>.eahjefxxp1netdbyklgqj5y1ud.cx.internal.cloudapp.net:9092
Next, paste these values into the producer.py file where required.
This Quickstart article details how to create a topic in our Kafka cluster using the information provided above. Use the following command to do so:
/usr/hdp/current/kafka-broker/bin/kafka-topics.sh --create --replication-factor 3 --partitions 8 --topic SensorReadings --zookeeper $KAFKAZKHOSTS
Upload the Application
While there are various ways to upload our Python file, it’s generally most sensible to follow the path of least resistance best. If not yet present, download the necessary files from GitHub.
In the shell, create the file using nano or another preferred editor. Then, copy and paste the contents of producer.py. Next, change the kafkaBrokers
value to the value returned above. Save the file.
The application uses the kafka-python
library. Install this as follows:
pip3 install kafka-python
Test the Producer
Run a quick test to confirm that everything is functioning properly by initiating the Kafka command-line consumer script.
<PATH>/kafka-console-consumer.sh --topic SensorReadings --from-beginning --bootstrap-server <paste your broker list here>
The path at the time of writing was /usr/hdp/2.6.5.3033-1/kafka/bin/.
The consumer then waits for Kafka to send data.
Now, open a second SSH console and execute the producer:
python3 producer.py
If all goes well, the producer sends a short burst of data to Kafka. This data is visible in the consumer console.
{"sensor": "mass", "machine": "mixer", "units": "kg", "time": 1646861863942, "value": 144.174}
{"sensor": "rotation", "machine": "mixer", "units": "rpm", "time": 1646861863942, "value": 65.009}
{"sensor": "fillpressure", "machine": "mixer", "units": "kPa", "time": 1646861863942, "value": 575.347}
{"sensor": "bowltemp", "machine": "mixer", "units": "DegC", "time": 1646861863942, "value": 25.452}
{"sensor": "ovenspeed", "machine": "oven", "units": "m/s", "time": 1646861863942, "value": 0.203}
There are five command-line options used for generating the data in the desired form:
-l
or --loop_count
followed by an integer specifies the number of cycles to generate. The actual number of records generated depends on the other selected options. -s
or --sensor
followed by a sensor name specifies which sensor to simulate. If using this option, there’s no need to use the machine name option, as the sensor is unique. The machine name is purely informational. -r
or --rate
followed by a decimal specifies the number of readings to send per second. This is merely the delay before sending the data on each cycle. It’s not extremely accurate, but it’s sufficient for its purpose. -m
or --machine
followed by one of the machine names specifies the machine to use. If used without the --sensor
option, then all the sensors for that machine fire simultaneously. -t
or --time
followed by an integer specifies the number of seconds to run. When used, the process ignores the loop count option.
These options can combine to run the simulation using the run-scenario.sh shell script as a starting point. This script executes the application many times in background mode, simulating the continual delivery of data as if from a working environment.
In the script, set the command-line options to provide the inputs as required by the scenario.
Executing the script as below gives a burst of requirement-specified data for 25 seconds:
./run-scenario.sh 25
It does this by executing the following:
python3 producer.py -t 25 -r 0.2 -s mass &
python3 producer.py -t 25 -s rotation &
python3 producer.py -t 25 -r 10 -s FillPressure &
python3 producer.py -t 25 -r 10 -s BowlTemp &
python3 producer.py -t 25 -s OvenSpeed &
python3 producer.py -t 25 -r 1 -s ProveTemp &
python3 producer.py -t 25 -r 5 -s OvenTemp1 &
python3 producer.py -t 25 -r 5 -s OvenTemp2 &
python3 producer.py -t 25 -r 10 -s OvenTemp3 &
python3 producer.py -t 25 -r 2 -s CoolTemp1 &
python3 producer.py -t 25 -r 1 -s CoolTemp2 &
python3 producer.py -t 25 -s PackSpeed &
python3 producer.py -t 25 -r 10 -s PackCounter &
Running the script without any command-line parameters gives a 15-second burst. Any positive integer will work.
Conclusion
By now, it should be clear that using Kafka on HDInsight is essentially identical to using Kafka in-house. Whether running as a standalone on an old Ubuntu PC server, a multi-server cluster in our server room, or on Azure HDInsight, the approach doesn’t change.
Additionally, there’s no difference between using this producer or streaming from a database. The process is the same for both on-site and Azure Kafka implementations.
Now, all that remains is to stream and analyze the data. To explore this process, continue to the final article of this series, which will use PySpark to do just that.
To learn more about streaming at scale in HDInsight, and Apache Kafka in Azure HDInsight, check out these Microsoft Azure Developer Community resources.