This is Part 1 of a 3-part series that shows how to create an end-to-end streaming solution starting by streaming data into Azure Event Hubs, processing it with Streaming Analytics, and saving the results to a Cosmos DB database.
Azure helps ingest, process, and store streaming data. It scales automatically as a company's data needs expand. Plus, Azure’s serverless options mean you don't need to spend time maintaining your servers and you pay only for what you use.
This article will demonstrate writing a Python app to ingest weather data into Azure using an Azure Event Hub. Later articles in this three-part series will process the data using Azure Stream Analytics, store it in an Azure SQL database, and then analyze and visualize the data using Power BI.
Following this tutorial requires an Azure subscription. Sign up for free and get a $200 credit to use popular services.
Considering an Azure Event Hub
Streaming data into Azure requires an Event Hub to receive streamed data. Event Hub uses Advanced Message Queuing Protocol (AMQP), an open standard for communicating data between applications. Since it’s an open standard, many applications and devices can stream to it out of the box. Event Hubs also support HTTP and Kafka.
Event Hub can then broadcast these messages to other connected applications for further processing and storage. Each Event Hub can handle tens of thousands of messages per second.
Internet of things (IoT) devices, connected vehicles, or other applications can often stream events directly into Event Hub without an application in the real world. However, since there aren’t any IoT devices available for this tutorial, use Python instead.
Creating an Event Hubs Workspace
Creating an Event Hub first requires a workspace. The workspace is a container for Event Hubs, managing access, scaling, and a schema registry.
Enter the Azure portal, search for “Event Hubs,” and create a new one. Then, pick a subscription and resource group. Creating a new resource group makes it easy to delete everything by simply deleting the resource group later. So, create a resource group with the name data-streaming-example
.
Next, pick an Event Hub name, such as data-streaming-evhub
. Pick a close location, for example, West Europe.
Select the Basic Pricing Tier, which should be enough for this example. This tier provides one consumer group and 100 brokered connections, which means one extra application can consume events (there is one by default) and 100 applications can simultaneously write or receive messages. Notably, there is a one-day message retention period.
A Standard tier provides 20 consumer groups and 1,000 brokered connections and stores messages for up to seven days. This tier also allows capturing events and writing them to a blob in a storage account. Additionally, this tier provides a schema registry for structured data messages that someone can share across applications.
The Premium tier offers the same features as the Standard tier in greater quantity. The Standard tier is about twice as expensive as the Basic tier, and the Premium tier is about 34 times as expensive as the Standard tier (although users don’t pay per message, so the difference depends on use).
Finally, select Throughput Units representing Event Hub capacity. A single throughput unit can ingress up to 1 MB per second or 1,000 events per second (whichever comes first) and egress up to 2 MB per second or 4,096 events per second. The Standard tier lets users automatically scale the number of throughput units by enabling auto-inflate.
The Create Namespace blade should look like this:
Now, Review + create the Event Hubs namespace.
Alternatively, create the namespace using the Azure CLI with these commands:
az group create –name data-streaming-example –location "West Europe"
az eventhubs namespace create –resource-group data-streaming-example
–name data-streaming-evhub –sku Basic –location "West Europe"
These commands create the resource group and the namespace.
Creating an Event Hub
Now that there is a workspace, create the actual Event Hub. The Event Hub is a channel to receive messages and deliver them to downstream applications.
To create an Event Hub, go into the namespace and click the + Event Hub button at the top. Then, name the Event Hub — for example, weather — to stream weather data.
The Partition Count organizes messages for downstream parallelism, so the more readers the application will likely have, the more partitions it needs. Keep the default of two in this case.
Basic tier users can’t change the message retention or enable the capture, so go ahead and Create the Event Hub.
Alternatively, to create the Event Hub using the Azure CLI, use these commands:
az eventhubs eventhub create –resource-group data-streaming-example
–namespace-name data-streaming-evhub –name weather –message-retention 1 –partition-count 2
The default message retention isn’t valid for the Basic tier, so explicitly set that to 1. Also, note the default partition count is four when using the CLI, versus two when using the portal.
Choosing a Data Set
The Event Hub is now in place. Next, get a data set. This data would usually come from IoT devices, connected cars, or other devices. However, since that type of data is not available in this tutorial, use an open data set, like the weather_description.csv file from a Kaggle set.
The CSV file has an hourly weather description (such as clear skies, overcast, fog, etc.) for various cities in the US and abroad. The cities are columns in the file and the rows contain a date and time, as well as each city's description.
Because the data likely came from multiple weather stations, break the columns into rows. Thirty-six cities provide 36 rows per hour. More than 45,000 rows will stream over 1.5 million rows over time (which takes a while). Do this in the Python application, though.
Streaming into Azure
It’s time to get to coding. Ensure Python is on the local machine. Also, create a file named index.py and place weather_description.csv in the same folder.
Additionally, this project requires a package to send messages to the Azure Event Hub. Use the azure-eventhub
package (here, v5.7.0), which uses the AMQP protocol.
pip install azure-eventhub==5.7.0 –user
The complete application is only 41 lines of code:
import time
import json
from csv import reader
from azure.eventhub import EventHubProducerClient, EventData
connection_str = '<Your connection string>'
eventhub_name = 'weather'
class Weather:
def __init__(self, date, city, description):
self.date = date
self.city = city
self.description = description
def __str__(self):
return f"{self.date}, {self.city}, {self.description}"
def send_to_eventhub(client, data):
event_data_batch = client.create_batch()
event_data_batch.add(EventData(data))
client.send_batch(event_data_batch)
def main():
with open('weather_description.csv', 'r') as weather_descriptions:
csv_reader = reader(weather_descriptions)
header = next(weather_descriptions)
cities = header[:-1].split(',')[1:]
client = EventHubProducerClient.from_connection_string\
(connection_str, eventhub_name=eventhub_name)
for row in csv_reader:
date = row[0]
i = 1
for city in cities:
weather = Weather(date, city, row[i])
send_to_eventhub(client, json.dumps(weather.__dict__))
print(json.dumps(weather.__dict__))
i += 1
time.sleep(3)
main()
This code starts with some imports, the connection string, and the Event Hub name. The connection string to the Event Hub workspace is in the Azure portal, in the workspace under Shared access policies. Click RootManageSharedAccessKey to find a primary key, secondary key, and connection string for both. Copy one connection string and paste it into the application.
Alternatively, use the Azure CLI to get the connection string using this command:
az eventhubs namespace authorization-rule keys list –resource-group data-streaming-example
–namespace-name data-streaming-evhub –name RootManageSharedAccessKey
Then comes the send_to_eventhub
function. First, the code creates a data batch using an EventHubProducerClient
, an argument to the function. Next, it adds the data to the batch and sends the batch. That piece of code is pretty self-explanatory.
The primary function is a bit more challenging to read, mainly because it changes columns into rows. The code opens the CSV file and gets the header. Then it splits the header at the comma and retrieves all values, except for the first (the datetime
column). There is now an array containing all city names.
Then, the code loops through the rows in the CSV file. The first value is the datetime
, so the code grabs it. Then, for each row, the code loops through the cities. After that, it’s a matter of matching index i + 1
to the cities.
The application next creates a Weather
object from the data and turns it into a JSON string. Sending a JSON string now makes it easier to process the data later. After that, the application waits for three seconds, then sends the next row.
Testing the Application
Testing is straightforward. Just start the application and watch it send messages to the Azure Event Hub. The print(data) function in the code should reveal what the application just sent.
In Azure, go to the workspace and find the Event Hubs. In the Event Hubs overview, click on the weather Event Hub. The messages should come pouring in (with a slight delay).
If the data isn’t visible at first, wait one to two minutes to see requests coming in. If not, there is likely an issue with the code. Ensure the connection string is valid.
Next Steps
Azure has many benefits when working with data, like automatic scaling and paying only for what you use. Your services can also connect to various other services, both in Azure and on-premises.
Users can set up an Azure Event Hub namespace and an Event Hub using either the Azure portal or the Azure CLI. Then, that Event Hub can receive information from a data source, such as a connected vehicle or IoT device.
Read the following article in this three-part series to learn how to process all that data by connecting the Event Hub to Azure Streaming Analytics.
To learn more about how to connect to your data sources, visualize and discover what's important, and share that with your team, check out our Reactor YouTube Introduction into Data Analysis using Power BI.