Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / artificial-intelligence / machine-learning

Environmental Monitoring and Anomaly Detection with IoT and Microsoft Cloud

3.12/5 (3 votes)
31 Mar 2015CPOL3 min read 21.7K  
The availability of low cost sensors for environmental monitoring coupled with the capabilities of the Microsoft Cloud provides a set of enormous opportunities in building a solid infrastructure for smart cities.

This article is an entry in our Microsoft Azure IoT Contest. Articles in this section are not required to be full articles so care should be taken when voting.

Introduction

The availability of low cost sensors for environmental monitoring coupled with the capabilities of the Microsoft Cloud provides a set of enormous opportunities in building a solid infrastructure for smart cities. An architecture that was explored in our application is shown in Figure 1.

Image 1

Figure 1. An Environmental Monitoring Application

Background

The application was able to successfully gather temperature data from a sensor connected to an Arduino compatible board (Gertduino) attached to a Raspberry Pi (RPI) board. A python script captured the data and transmitted that periodically as events to an Event Hub in the Microsoft Cloud. A worker role consumed the events from the 16 partitions in the Event Hub and stored the information into the Table Storage. The user accessed that information using a Web Role which extracted that information from the Table Storage. That Web Role supported a responsive website built using HTML5, CSS3, JavaScript and bootstrap. WCF Services were used to extract data from the Table Storage on demand.

Using the Code

The communication between the RPI and the Event Hub required a Shared Access Signature (SAS) key, and that was generated using an open source Signature Generator application that was downloaded from Microsoft MSDN (Figure 2).

Image 2

Figure 2. Share Access Signature Generation Application

The generated key was embedded into the Python script running on the RPI, and the captured events (temperature) data was transmitted in JSON format using http. The “Requests” python package was installed and used to create the POST request. The use of “Requests” made it very easy to create those requests. Each request took about two seconds to execute. The RPI was connected to local WiFi network using a USB WiFi dongle and the board was powered with a power adapter.

The next key piece of work was to create a worker role that consumed the events. Since the RPI was in a separate location, a device simulator was built to make testing easier. The code for the device simulator is provided next.

DEVICE SIMULATOR CODE:

C#
using System;
using System.IO;
using System.Net;
using System.Threading;

namespace IoTDeviceSimulator
{
    class Program
    {
        static void Main(string[] args)
        {
            // Generate a SAS key with the Signature Generator.: https://github.com/sandrinodimattia/RedDog/releases
            var sas = "SharedAccessSignature sr=https%3a%2f%2fgqc-ns.servicebus.windows.net%2fgqc%2fpublishers%2fmypi%2fmessages&sig=uODLVMFxG3DwM5qGtINr3rkA%3d&se=64709&skn=senderdevice";
            // Namespace info.
            var serviceNamespace = "gqc-ns";
            var hubName = "gqc";
            var deviceName = "mypi";
            Console.WriteLine("Starting device: {0}", deviceName);

            var uri = new Uri(String.Format("https://{0}.servicebus.windows.net/{1}/publishers/{2}/messages", serviceNamespace, hubName, deviceName));

            // Keep sending.

            while (true)
            {
                var eventData = new
                {
                    Stage = new Random().Next(20, 50),
                    Flow = new Random().Next(10, 15)
                };

                var req = WebRequest.CreateHttp(uri);

                req.Method = "POST";

                req.Headers.Add("Authorization", sas);

                req.ContentType = "application/atom+xml;type=entry;charset=utf-8";

                using (var writer = new StreamWriter(req.GetRequestStream()))
                {
                    //{"piesense1":{Stage:12.2,Flow:20.2}, unixtimestamp:12345}
                    writer.Write(@"{" + deviceName.Replace("-", string.Empty) + ":");
                    writer.Write("{ Stage: " + eventData.Stage + ",");
                    writer.Write("Flow: " + eventData.Flow + "}");
                    writer.Write(", unixtimestamp: " + DateTime.Now.ConvertToUnixTimestamp());
                    writer.Write("}");
                }

                using (var response = req.GetResponse() as HttpWebResponse)
                {
                    Console.WriteLine("Sent stage using legacy HttpWebRequest: {0}", eventData.Stage);

                    Console.WriteLine(" > Response: {0}", response.StatusCode);
                }

                Thread.Sleep(60000);

            }
        }
    }

    public static class ExtensionMethods
    {
        public static Int64 ConvertToUnixTimestamp(this DateTime pDotNetDateTime)
        {
            pDotNetDateTime = TimeZoneInfo.ConvertTimeToUtc(pDotNetDateTime);
            if (pDotNetDateTime.Ticks >= 621355968000000000)
            {
                return (pDotNetDateTime.Ticks - 621355968000000000) / 10000000;
            }
            return 0;

        }
    }
}

The events generated by the RPI and the device simulator were consumed in a worker role (Figure 3).

Image 3

Figure 3. A Worker Role that consumed the events (a code snippet is provided later)

As mentioned earlier, we first built a desktop prototype that created a device simulator (Figure 4) and a consumer , and then moved that consumer code into a worker role (Figure 5).

Image 4

Figure 4. A Device Simulator (A code snippet was provided earlier)
C#
private async Task RunAsync(CancellationToken cancellationToken)
        {
            while (!cancellationToken.IsCancellationRequested)
            {
                Trace.TraceInformation("Working");

                var partitionCount = 16;
                var serviceNamespace = "gqc-ns";
                var hubName = "gqc";
                var receiverKeyName = "receiver";
                var receiverKey = "uXZt2dtha/ULhgd=";

                CancellationTokenSource cts = new CancellationTokenSource();
                for (int i = 0; i <= partitionCount - 1; i++)
                {
                    await Task.Factory.StartNew((state) =>
                    {
                        Console.WriteLine("Starting worker to process partition: {0}", state);
                        var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri
                        ("sb", serviceNamespace, ""), new MessagingFactorySettings()
                        {
                            TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(receiverKeyName, receiverKey),
                            TransportType = Microsoft.ServiceBus.Messaging.TransportType.Amqp
                        });
                        var receiver = factory.CreateEventHubClient(hubName)
                            .GetDefaultConsumerGroup()
                            .CreateReceiver(state.ToString(), DateTime.UtcNow);

                        while (true)
                        {
                            // Receive could fail, I would need a retry policy etc...
                            var messages = receiver.Receive(10);
                            foreach (var message in messages)
                            {
                                var eventBody = Newtonsoft.Json.JsonConvert.DeserializeObject
                                <Dictionary<string, double>>(Encoding.Default.GetString(message.GetBytes()));
                               
                                Dictionary<string, EntityProperty> properties = 
                                	new Dictionary<string, EntityProperty>();
                                foreach (var item in eventBody.Keys)
                                {
                                    properties.Add(item, new EntityProperty(eventBody[item].ToString()));
                                }

                                //Console.WriteLine("{0} [{1}] Temperature: {2}", 
                                //DateTime.Now, message.PartitionKey, eventBody.Temperature);
                                DynamicTableEntity dte = new DynamicTableEntity("ccno1", 
                                DateTime.Now.ToString("yyyy-MM-ddTHH:mm:00zzz")) { Properties = properties };
                                CloudTable table = getSensorTable();
                                TableOperation insertOp = TableOperation.InsertOrMerge(dte);
                                table.Execute(insertOp);
                            }

                            if (cts.IsCancellationRequested)
                            {
                                Console.WriteLine("Stopping: {0}", state);
                                receiver.Close();
                            }
                        }
                    }, i);
                }
                await Task.Delay(1000);
            }
        }
Figure 5. Consuming the events and inserting them into Table Storage

This turned out to be bit challenging because a worker role behaves a bit differently (it is always in the Run loop) compared to a desktop application. So the considerations for Async tasks have to be examined very carefully. After reviewing several articles on the topic, we were able to put together a worker role that behaved in a reliable manner. The resulting table in the Table Storage is shown in Figure 6.

Image 5

Figure 6. Table Storage of Event Data

The stored measurements could be displayed along with the output of an environmental simulation model against a Bing Maps backdrop using a webrole compliant with HTML5, CSS3 and JavaScript. The model is executed by another worker role, and the results are stored in Blob Storage and Table Storage. The webrole was provisioned in the Microsoft Cloud and hosted WCF services to extract data from the Cloud Storage for rapid visualization. A sample visualization of modeling results (in red) along with the display of sensor data (in blue) are shown in Figure 7.

Image 6

Figure 7. Display of live sensor data using Bing Maps

Points of Interest

  1. Use of PowerBI for visualization
  2. Use of Stream Analytics to process the events in the Event Hub
  3. Use of HDInsight to move large datasets from the table storage in the HDInsight clusters, and then processing those datasets
  4. Use of Microsoft Machine Learning Studio to directly connect to the Cloud Storage using REST API, and the running the appropriate algorithms to detect anomalies in the data.

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)