Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles
(untagged)

Streaming at Scale with Azure Event Hubs, Azure Functions, and Cosmos DB Part 1: Getting Streaming Data into Azure

0.00/5 (No votes)
23 Mar 2022 1  
Stream data to an Event Hub
This is Part 1 of a 3-part series that explores how to create an end-to-end streaming solution starting by streaming data into Azure Event Hubs, processing it with Azure Functions in Java, and piping the results into a Cosmos DB database. This article shows how to subscribe to a real-time Bitcoin-to-USD price WebSocket, then forward the messages to an Azure Event Hub.

In this three-part series, we’ll create an end-to-end streaming solution. We’ll start by streaming data into Azure Event Hubs, then process it with Azure Functions in Java and pipe the results into a Cosmos DB database. Finally, we’ll analyze the data in Power BI.

We’ll subscribe to a data stream via a WebSocket in this first article. Then, we’ll process that stream in Java to push the data into an Azure Event Hub. We’ll use a real-time cryptocurrency feed dataset that shows the changing price of Bitcoin relative to US dollars.

The full code is available on GitHub.

Connecting to the Blockchain Exchange WebSocket

To follow along, sign up to the Blockchain Exchange for free. Use the API documentation to set up your API key and get familiar with the WebSocket endpoint.

The first step in sending our data to Azure Event Hubs is connecting to our source data stream. We'll create a simple Java project to handle the data.

In Visual Studio Code (VSCode), create a boilerplate Java application by installing the Java Extension Pack, opening the Command Palette (Ctrl + Shift + P), and selecting Create Java Project. Then, follow the prompts and select Maven for dependency management.

When your application is set up, you have a Java project with a default class and a main function. There’s also a pom.xml file to host the Maven dependencies.

Initially, we only need one dependency — the NV WebSocket Client — to connect to the WebSocket. So, be sure to add it to the pom.xml file.

We need to create a WebSocket object in our main() function:

Java
WebSocket ws = new WebSocketFactory().createSocket(URL);

Then we need to add the necessary headers mentioned in the Blockchain Exchange API documentation:

Java
// Add required header for connection
ws.addHeader("Origin","https://exchange.blockchain.com");

We also need to add a listener before connecting to the WebSocket. The listener will override the onTextMessage() function provided by the WebSocketAdapter interface. This function will fire every time our WebSocket receives a message so that we can process it.

To begin, we’ll print the message to confirm it's working.

Java
ws.addListener(new WebSocketAdapter() {
   @Override
   public void onTextMessage(WebSocket websocket, String message) throws Exception {
       // Received a text message so lets start by printing it out
       System.out.println(message);
   }
});

Then, we’ll connect to the WebSocket and send the subscribe message as the API documentation specifies. This message tells the Blockchain Exchange that we’d like to subscribe to the BTC-USD prices channel.

Java
try {
    ws.connect();
    // subscribe to the prices channel for USD-BTC price data
    ws.sendText("{\"token\": \""+authToken+"\", 
    \"action\": \"subscribe\", \"channel\": \"prices\", 
    \"symbol\": \"BTC-USD\",\"granularity\": 60}");
} catch (WebSocketException e) {
    e.printStackTrace();
} 

The screenshot below shows the WebSocket outputting messages to the console. So, our listener is receiving messages correctly.

Since data is streaming in from the WebSocket, we now want to pass that data into an Azure Event Hub.

Creating an Azure Event Hub

To begin, ensure you have an Azure account or create one for free. Once in the Azure console, search for “Event Hub” and select Event Hubs under Services.

Next, create a namespace in the Event Hubs service area. The namespace will hold all our Event Hub project’s resources, like the screenshot below. Provide a Subscription and a Resource group. Click Create new if you don’t have a resource group already.

Fill in the remaining details to provide your Namespace name and select a suitable Location and Pricing tier. For now, we set up the Event Hub with the minimum number of throughput units. We can scale them later if we wish to send more than 1MB or 1,000 messages per second.

Click Review + create to finalize the namespace. When it finishes, it will appear in the list of namespaces.

Now that we’ve created the namespace, we can build our Event Hub. Click the namespace and Create Event Hub.

The Event Hub requires a Name, Partition Count, and Message Retention value. You can’t change the partition count later, so take care when choosing this value. The more partitions, the higher the possible throughput, as messages will distribute across these partitions. So, you can scale the number of processors downstream to keep up with demand. You should choose a partition number that reflects what your application requires at its peak load.

We need two partitions for this project.

The message retention value sets how long the messages will be available (in days). It guarantees that the messages will exist for at least this amount of time. However, that doesn’t necessarily mean they will disappear as soon as the time has expired.

Let’s set a message retention value of 1 for this project.

Connecting to the Event Hub

Once the Event Hub is ready, we need to configure it so our application can connect. So, we need to create a connection.

First, click the Connect button or select Shared access policies in the Event Hub’s Settings menu. Create a policy by naming the policy and ticking the Manage, Send, and Listen boxes. When you select this policy from the list now, your credentials will appear like in the image below. These credentials allow our Java application to connect to the Event Hub and send messages.

Now our Event Hub is ready to receive messages. Next, we’ll modify our Java apps listener to pass messages from the Blockchain Exchange WebSocket to the Event Hub via HTTPS. You could use Event Hubs library to do this, but we'll use the Event Hub REST API.

We only need to modify our onTextMessage function that previously printed our Blockchain Exchange message to the screen.

First, we need our auth token. Microsoft provides a helper function to do this. We just supply our resource URI, our key name (policy name), and the primary key that our policy provides.

Java
String auth = GetSASToken("msblog.servicebus.windows.net", "java-app", eventHubKey);

Then, we create a variable to hold the URL of the REST API endpoint, where we’ll post the messages. We also make a JSON array object to batch our messages before sending them. We could send them individually, but batching sets us up to increase throughput later if needed.

JavaScript
String URL = 
https://msblog.servicebus.windows.net/blockchain-usd-btc-price/messages?timeout=60&api-version=2014-01;
/ create an array to hold the batch
JSONArray array = new JSONArray();

The batch messages API endpoint requires an array of JSON objects that looks like this:

JavaScript
[{"Body":"Message1"},{"Body":"Message2"},{"Body":"Message3"}]

Each JSON object must have a key of Body followed by the message. We’ll create a JSON object in this format for each message we receive and add it to the array.

JavaScript
// Received a text message so let's convert it to a json object and store in an array
JSONObject obj = new JSONObject();
obj.put("Body", message);
array.add(obj);

We'll continue to do this until our JSON array reaches 100 messages. Then, we’ll create an HTTP POST request with the required headers, including the earlier authorization key. Our request’s body will be a JSON string representing our JSON array that matches the API’s required format.

Once we send the message, we’ll close the connection and reset our JSON array so we can build up our next batch of 100 messages:

JavaScript
if (array.size() > 100) {
   // if the batch is over 100 messages, send it and then create a new batch
   CloseableHttpClient httpClient = HttpClientBuilder.create().build();
   try {
       HttpPost request = new HttpPost(URL);
       request.setHeader("Content-Type", "application/vnd.microsoft.servicebus.json");
       request.setHeader("Authorization", auth);
       request.setEntity(new StringEntity(array.toJSONString()));
       CloseableHttpResponse response = httpClient.execute(request);
       response.close();
   }
   finally {
       httpClient.close();
   }
   // reset array
   array = new JSONArray();
}

Within the Azure Event Hub console, graphs on our Event Hub’s overview page show the number of requests made and the number of messages received or sent.

The screenshot above shows the requests to our Event Hub. It also indicates 101 messages received, the size of our first batch. So, our application is working as expected.

Next Steps

This article has explored how we can use Azure Event Hubs to receive messages for downstream processing. We successfully connected to a WebSocket to get messages, then forwarded them on to the Event Hub using HTTPS.

We used an example WebSocket from the Blockchain Exchange to build a proof of concept. These technologies can be incredibly beneficial in the real world, especially when you have real-time data feeds like a high volume of transactions from an e-commerce system or interactions on a social network.

Putting messages into the Event Hub frees our application from having to meet demand when there is a message spike. Instead, the messages simply go to the Event Hub. We can scale our downstream processors horizontally to meet the demand.

Using this method ensures we will process all our messages. We simply add more processors if we want to increase the speed. So, it is essential to choose the number of partitions carefully when creating the Event Hub. This setting ultimately determines how much you can scale and the message throughput you can process.

In the following article in this series, we’ll connect our Event Hub to Azure Functions. Azure Functions will process our messages and store them in Cosmos DB. Later, we’ll analyze and visualize the data in Power BI.

To learn how to use Azure Cosmos DB change feed to understand user patterns, perform real-time data analysis and visualization, check out Use Azure Cosmos DB change feed to visualize real-time data analytics.

License

This article has no explicit license attached to it but may contain usage terms in the article text or the download files themselves. If in doubt please contact the author via the discussion board below.

A list of licenses authors might use can be found here