Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / IoT

Sending Events from Azure Event Hub to Azure SQL Database using Azure Functions

5.00/5 (5 votes)
7 Feb 2017CPOL13 min read 47K  
This article describes how to get messages sent to Azure Even Hub and load them into Azure SQL Database using Azure Functions.

Introduction

IoT solutions include some classic scenarios - devices are sending events into some message hub that can accept any amount of messages that are coming, then you have some service that picks-up messages and puts them in some storage. Azure platform provides variety of services that can help you in this case.

In this article, I will show how to implement IoT workflow where you have devices that send events to Azure Event Hub service and then you move them to Azure SQL Database using Azure Functions.

Image 1

Messages will be formatted as JSON documents – example of one message is:

JavaScript
{"eventId":200,"deviceId":1,"value":25,"timestamp":"2017-01-22T16:59:46.6169948Z"}

DeviceId and value are hypothetical values that device sends. Value might be temperature, pressure, speed or any other value that device might send. In the article, the following topics will be covered:

Create Event Hub

As a first step, you would need Azure Event Hub that will receive events from the devices. You would need the following:

  • One Event Hub namespace with the name: code-project-event-hub-demo
  • One Event Hub with the name gatewaytosqldatabase

If you are familiar with Event Hubs, you can skip the rest of this section and go to the Sending Events to Azure Event Hub section.

Otherwise, you can follow instructions in this section to configure Azure Event Hub.

To create Azure Event Hub, first you need to create an Azure Event Hub namespace that will contain the Event Hub. Open Azure portal, select New > Internet of Things > Event Hub. In the blade that is opened, enter the name of your Event Hub (e.g., code-project-event-hub-demo):

Image 2

Optionally, you can choose pricing ties (standard or basic), and decide in what resource group and region your Azure Event Hub should be placed.

NOTE: It would be good to place Azure Event Hub, Azure Function, and Azure SQL Database in the same region in order to avoid cross-cluster transfer between these services.

In the previous step, you have created a namespace that may contain several event hubs. Now you need to create at least on Event Hub that will receive events from devices. Open Event Hub details, choose +Event Hub, and add new Event Hub – in my case, it will be called GatewayToSqlDatabase:

Image 3

Optionally, you can define properties of event hub such as partition count, retention, etc.

Sending Events to Azure Event Hub

In this example, I will use Device simulator app that Microsoft Program Manager Paolo Salvatori created in his demo sample. This sample contains demo app that simulates devices and sends JSON messages to Event Hub:

Image 4

Open Device simulator app and populate the following text boxes:

  • Namespace is a name of the Azure Event Hub namespace created in first step of this article – in this example, code-project-event-hub-demo
  • Event Hub is a name of Event Hub – in this example, gatewaytosqldatabase
  • Key Value is a primary or secondary key for Azure Event Hub namespace. Open Event Hub namespace in Azure portal, go to Shared access policies > RootManagerSharedAccessKey and copy primary/secondary key:

Image 5

If you have entered these values correctly, you can start device simulator and you will see that some messages are sent in the Device Simulator app.

Also, if you open Event Hub > Overview blade in Azure portal, you would see that there are new messages that are posted in the Event Hub.

Moving Events from Azure Event Hub into Azure SQL Database using Azure Functions

Now we need a service that will watch the Event Hub and take events that are sent there. First you need to create a new Azure Function via Azure portal using New > Compute > Function App. On the blade that is opened, you can set the name of the function app, select resource group and location (use the same location as in the Event Hub if possible), and choose one storage account.

Image 6

When Azure finishes deployment, open details of the Azure Function, select +New Function and choose EventHubTrigger-CSharp (a C# function that will be run whenever an event hub receives a new event) from the template gallery.

When you select that type, you should enter the following information:

  • Name of the function, e.g. code-project-function-demo
  • Name of the Event Hub namespace code-project-event-hub-demo
  • Create Event Hub connection:
    • Put the name of the Event Hub
    • Put Event Hub namespace connection string

If you set everything, you can go to the function that you have created, open </> Develop tab and see the code that is generated by default. If you have added some events into Event Hub, you can Run the function and see messages from event Hub in the Logs window (select Logs button on the top right corner if Logs are not shown.

Image 7

If you see some problem, you can review your settings. All settings are placed locally in the function.json file. If you press “View files” link, you can see the list of all files in your function (Initially, there will be just two files, function.json and run.csx). Content of the function.json file should be something like:

JavaScript
{
  "bindings": [
    {
      "type": "eventHubTrigger",
      "name": "myEventHubMessage",
      "direction": "in",
      "path": "gatewaytosqldatabase",
      "connection": "code-project-event-hub-demo/gatewaytosqldatabase",
      "consumerGroup": "$Default"
    }
  ],
  "disabled": false
}

Now we have Azure Event Hub that receives messages from device (or device simulator in this case) and Azure function that will be invoked whenever new message is delivered to the Event Hub.

Now it is time to create Azure SQL Database and configure Azure function to write the message into the database instead of the log.

Setting-up Azure SQL Database

In this example, we would need one Azure SQL Database to store messages sent by device (or device simulator in this case). Messages may be stored in the various storages such as:

  • Azure Blob Storage or Azure Table Storage if you need to store large amount of messages with low price of storage.
  • Azure Document DB if you want to be able to search, filter and query on the messages.
  • Azure SQL Database if you need advanced analytic using full SQL language and if you need to correlate information from your messages with outer entities in database (e.g., fraud detection where you need to analyze events and correlate them with customer behavior)

In this example, I will use Azure SQL Database as a target.

  1. Store all events in their original format (as JSON text)
  2. Parse JSON messages sent from devices

Creating Destination Table

First, you need to create Azure SQL database if you don’t have it. You can select New > Databases > SQL Database in the Azure portal:

Image 8

You can enter database name (e.g., code-project-database-demo), select the server where this database will be placed (choose a server in the same location as Event Hub and Azure Function) and pricing tier.

We would need one table (I will call it Events) where messages from Azure Event hub will be stored. This table can be very simple – it may have one string column for the message and optionally some date column that will contain info when the message is received:

SQL
CREATE TABLE [dbo].[Events](
       Data NVARCHAR(MAX),
       DateAdded datetime2 DEFAULT ( GETDATE() ),
       INDEX cci CLUSTERED COLUMNSTORE
);

The interesting part is INDEX cci CLUSTERED COLUMNSTORE (or CCI how I will call it in the rest of the article) that is defined on the table. Clustered column store index organizes table in column store format that is optimized for analysis of events. The two main benefits of column store format:

  • High compression – CCI provides 10-20x compression on your table (depending on your data) so you may see that 90-95% of your table space might “disappear”.
  • Batch mode analytic – CCI process rows in “batch mode”. Unlike standard queries that take row-by-row when they query data, CCI takes 100-900 rows in one batch and process them together, as a result, you might get the 10x faster queries compared to the standard tables.

NOTE: CLUSTERED COLUMNSTORE indexes are available only if your database is on Premium tier.

Another option is to create a table that contains separate columns for every fields from the JSON message:

SQL
CREATE TABLE [dbo].[Events](
    [EventId] [int] NOT NULL,
    [DeviceId] [int] NOT NULL,
    [Value] [int] NOT NULL,
    [Timestamp] [datetime2](7) NULL,
    INDEX cci CLUSTERED COLUMNSTORE
);

Table has fields that will be sent from devices in JSON messages. Like in the previous example, I’m adding CLUSTERED COLUMNSTORE index on a table for high compression and fast querying. The problem is that we need to parse JSON message and extract the fields before we add it into the table.

Stored Procedure for Inserting Messages

I will also create a stored procedure that will take a JSON message that is sent to Event hub parse it and place it into the target table:

SQL
CREATE PROCEDURE dbo.ImportEvents  @Events NVARCHAR(MAX)

AS  BEGIN

    INSERT INTO dbo.Events (Data)

        VALUES( @Events);

END

In this case, stored procedure do not perform some complex action. It just gets a string message and inserts it into the Events table.

In this procedure, we assume that you will send single JSON message from event Hub. If you are planning to send an array of messages, you can use the following procedure:

SQL
CREATE PROCEDURE dbo.ImportEvents  @Events NVARCHAR(MAX)
AS  BEGIN
    INSERT INTO dbo.Events (Data)
    SELECT value FROM OPENJSON( @Events);
END 

OPENJSON function will take an array of JSON objects and split it into array elements. It will return one row for each object in the array so you can import it into the Events table.

More interesting case would be inserting events into a table that has separate columns for each field in the JSON message.

SQL
CREATE PROCEDURE dbo.ImportEvents   @Events NVARCHAR(MAX)
AS  BEGIN
    MERGE INTO dbo.Events AS ExitingEvent
    USING (SELECT *
       FROM OPENJSON(@Events)
       WITH ([eventId] int, [deviceId] int, [value] int, _
             [timestamp] datetime2(7))) AS NewEvent
       ON (ExistingEvent.EventId = NewEvent.EventId)
    WHEN MATCHED THEN
        UPDATE SET
                   ExistingEvent.DeviceId = NewEvent.DeviceId,
                   ExistingEvent.Value = NewEvent.Value,
                   ExistingEvent.Timestamp = NewEvent.Timestamp
    WHEN NOT MATCHED THEN
        INSERT (EventId, DeviceId, Value, Timestamp)
        VALUES(NewEvent.EventId, NewEvent.DeviceId, NewEvent.Value, NewEvent.Timestamp);
END

This procedure will use MERGE statement that takes set of new rows and checks whether they exist in the Existing events. If the rows from the new set of events are matched with the existing events by value of EventId, MERGE statement will update existing rows, otherwise it will insert values from the new events into Events table.

In order to find set of new event rows that are serialized in the JSON message, I’m using new OPENJSON function that parses JSON text and extracts properties by the names specified in the WITH clause. In this case, we are taking eventide, deviceId, value and timestamp from JSON text. These values will be returned as rows returned by OPENJSON function and MERGE statement will compare them with the exiting events.

You can see in the GitHub sample https://github.com/azure-cat-emea/servicefabricjsonsqldb that created Paolo Salvatori how to use this procedure to import events from Event Hub via Worker roles and Service Fabric.

Analyzing Messages

The biggest value of Azure SQL Database when we are talking about storing event messages is the most powerful SQL language for analysis with ORDER BY, GROUP BY, WINDOW AGGREGATEs, etc.

The following query shows how to find the average value reported by devices per device:

SQL
SELECT DeviceId, AVG(value) as avgValue
FROM Events
GROUP BY DeviceId

Assumption here is that you have used table that has separate columns for each fields from the JSON message.

If you have messages in single string column, you can use JSON_VALUE function that is available in Azure SQL Database to get the values from JSON fields and use them in the query:

SQL
SELECT JSON_VALUE(Data, ‘$.DeviceId’) as DeviceID,
            AVG( (JSON_VALUE(Data, ‘$.value’) as float) ) avgValue
FROM Events
GROUP BY JSON_VALUE(Data, ‘$.DeviceId’)
WHERE DateAdded BETWEEN @startdate and @enddate

You can see the difference between storing event as single columns or multiple columns. If you choose table that has separate columns for each field, you would need to parse the JSON text either during import (in the procedure) and you would need to change table structure if message is changed. If you use single string column for message, you have faster insert and more flexibility when messages structure is changed, but you might have slower queries because you need to parse JSON in the query.

Writing Messages from Azure Function into Azure SQL Database

Now it is time to add the Azure function code that will send the message from Event Hub into Azure SQL database.

First, we need to setup connection to Azure SQL Database in connections of Azure Function app. Open Function app settings and in Develop section choose “Configure app setting”. Go to Connection string and add a new connection string with the following parameters:

  • name “azure-db-connection
  • value Server=tcp:[servername].database.windows.net;Database=[databasename]; User ID=[username];Password=[password];Trusted_Connection=False; Encrypt=True;

You would see something like the following figure:

Image 9

Once we setup connection, we ned to add some data access code. Azure Function does not have some built-in output that connects to Azure SQ Database. Therefore, we would need to add some data access code.

You can use Entity Framework, but since we don't have any C# objects, this might be an overhead. We have to execute simple SQL statement that sends one string parameter to database.

Alternative would be to write ADO.NET code, but you might end-up with a lot of data access code with opening/closing connection, handling exceptions, etc.

You can use Dapper framework to execute queries on Azure SQL Database. In order to use Dapper Framework, we need to add a reference in project.json file to the Dapper NuGet package. In Azure Functions, we can use any NuGet package, so we will add project.json file with the following content:

JavaScript
{
  "frameworks": {
    "net46": {
      "dependencies": {
        "Dapper": "1.50.2"
      }
    }
  }
}

If you use Dapper, you can easily execute any SQL query on a SQL connection:

C#
using (var conn = new SqlConnection(_connectionString))
{
  conn.Execute("<your query here>");
}

You can find more details about using Dapper in Azure Functions on Davide Mauri blog.

As an alternative, we can use any other data access framework. Below is the example of project file that references Belgrade SQL Client data access library:

JavaScript
{
  "frameworks": {
    "net46":{
      "dependencies": {
        "Belgrade.Sql.Client": "0.6.2"
      }
    }
   }
}

This is also one simple library that enables you to execute queries on SQL Server. If you want to use Belgrade SQL Client, you can replace the content of Run.csx file with the following content:

C#
using System.Configuration;
using System.Data.SqlClient;
using Belgrade.SqlClient.SqlDb;

public static async void Run(string myEventHubMessage, TraceWriter log)
{
    if(String.IsNullOrWhiteSpace(myEventHubMessage))
        return;
    try{
        string ConnString = ConfigurationManager.ConnectionStrings_
                            ["azure-db-connection"].ConnectionString;
        var cmd = new Command(ConnString);
        var sqlCmd = new SqlCommand("ImportEvents");
        sqlCmd.CommandType = System.Data.CommandType.StoredProcedure;
        sqlCmd.Parameters.AddWithValue("Events", myEventHubMessage);
        await cmd.ExecuteNonQuery(sqlCmd);
    } catch (Exception ex) {
        log.Error($"C# Event Hub trigger function exception: {ex.Message}");
    }
}

First, we will check the input empty string. This may happen if you just press Run to test the function, so in that case, you will not want to import empty string in database.

Then you need to fetch connection string with the name “azure-db-connection” that you have added in configuration, create command that will execute procedure ImportEvents, set command type to stored procedure, add parameter “Events” with the value provided by parameter, and execute the command.

If something happens, you will see exception in the log.

End-to-End Test

Now it is time to try this workflow end-to-end. Open device simulator, double check that it points to your Event Hub, and start it. You should see in the Device Simulator log and Azure Function Log that messages are going through the pipeline. Also if you execute SELECT * FROM Events, you will see that content of the table is changed.

Conclusion

Azure platform provides variety of services that can be used in a typical IoT workflow:

  • Services for receiving events from devices – Azure Event Hub and Azure eIoT Hub
  • Services that can be used to permanently store events – Azure SQL Database, Azure Blob Storage, Azure Document DB, and Azure Table Storage.
  • Services that can be used to move events from the services that receive events to the service that store events. Example of these services are Azure Functions, Azure Data Factory, and Azure Stream Analytics

In this article, I have shown how you can create workflow where you receive events in Azure Events Hub and move them to Azure SQL Database using Azure Functions. If you look at the code, you will see that you don’t need a lot of code to implement this workflow.

There are other alternative solutions that can be used to implement workflow that load events from Azure Event Hub into Azure SQL Database. Paolo Salvatori have described a similar solution that uses Azure Service Fabric as a mechanism to move messages: https://github.com/azure-cat-emea/servicefabricjsonsqldb.

History

  • 7th February, 2017: Initial version

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)