Introduction
The availability of low cost sensors for environmental monitoring coupled with the capabilities of the Microsoft Cloud provides a set of enormous opportunities in building a solid infrastructure for smart cities. An architecture that was explored in our application is shown in Figure 1.
Figure 1. An Environmental Monitoring Application
Background
The application was able to successfully gather temperature data from a sensor connected to an Arduino compatible board (Gertduino) attached to a Raspberry Pi (RPI) board. A python script captured the data and transmitted that periodically as events to an Event Hub in the Microsoft Cloud. A worker role consumed the events from the 16 partitions in the Event Hub and stored the information into the Table Storage. The user accessed that information using a Web Role which extracted that information from the Table Storage. That Web Role supported a responsive website built using HTML5, CSS3, JavaScript and bootstrap. WCF Services were used to extract data from the Table Storage on demand.
Using the Code
The communication between the RPI and the Event Hub required a Shared Access Signature (SAS) key, and that was generated using an open source Signature Generator application that was downloaded from Microsoft MSDN (Figure 2).
Figure 2. Share Access Signature Generation Application
The generated key was embedded into the Python script running on the RPI, and the captured events (temperature) data was transmitted in JSON format using http. The “Requests
” python package was installed and used to create the POST
request. The use of “Requests
” made it very easy to create those requests. Each request took about two seconds to execute. The RPI was connected to local WiFi network using a USB WiFi dongle and the board was powered with a power adapter.
The next key piece of work was to create a worker role that consumed the events. Since the RPI was in a separate location, a device simulator was built to make testing easier. The code for the device simulator is provided next.
DEVICE SIMULATOR CODE:
using System;
using System.IO;
using System.Net;
using System.Threading;
namespace IoTDeviceSimulator
{
class Program
{
static void Main(string[] args)
{
var sas = "SharedAccessSignature sr=https%3a%2f%2fgqc-ns.servicebus.windows.net%2fgqc%2fpublishers%2fmypi%2fmessages&sig=uODLVMFxG3DwM5qGtINr3rkA%3d&se=64709&skn=senderdevice";
var serviceNamespace = "gqc-ns";
var hubName = "gqc";
var deviceName = "mypi";
Console.WriteLine("Starting device: {0}", deviceName);
var uri = new Uri(String.Format("https://{0}.servicebus.windows.net/{1}/publishers/{2}/messages", serviceNamespace, hubName, deviceName));
while (true)
{
var eventData = new
{
Stage = new Random().Next(20, 50),
Flow = new Random().Next(10, 15)
};
var req = WebRequest.CreateHttp(uri);
req.Method = "POST";
req.Headers.Add("Authorization", sas);
req.ContentType = "application/atom+xml;type=entry;charset=utf-8";
using (var writer = new StreamWriter(req.GetRequestStream()))
{
writer.Write(@"{" + deviceName.Replace("-", string.Empty) + ":");
writer.Write("{ Stage: " + eventData.Stage + ",");
writer.Write("Flow: " + eventData.Flow + "}");
writer.Write(", unixtimestamp: " + DateTime.Now.ConvertToUnixTimestamp());
writer.Write("}");
}
using (var response = req.GetResponse() as HttpWebResponse)
{
Console.WriteLine("Sent stage using legacy HttpWebRequest: {0}", eventData.Stage);
Console.WriteLine(" > Response: {0}", response.StatusCode);
}
Thread.Sleep(60000);
}
}
}
public static class ExtensionMethods
{
public static Int64 ConvertToUnixTimestamp(this DateTime pDotNetDateTime)
{
pDotNetDateTime = TimeZoneInfo.ConvertTimeToUtc(pDotNetDateTime);
if (pDotNetDateTime.Ticks >= 621355968000000000)
{
return (pDotNetDateTime.Ticks - 621355968000000000) / 10000000;
}
return 0;
}
}
}
The events generated by the RPI and the device simulator were consumed in a worker role (Figure 3).
Figure 3. A Worker Role that consumed the events (a code snippet is provided later)
As mentioned earlier, we first built a desktop prototype that created a device simulator (Figure 4) and a consumer , and then moved that consumer code into a worker role (Figure 5).
Figure 4. A Device Simulator (A code snippet was provided earlier)
private async Task RunAsync(CancellationToken cancellationToken)
{
while (!cancellationToken.IsCancellationRequested)
{
Trace.TraceInformation("Working");
var partitionCount = 16;
var serviceNamespace = "gqc-ns";
var hubName = "gqc";
var receiverKeyName = "receiver";
var receiverKey = "uXZt2dtha/ULhgd=";
CancellationTokenSource cts = new CancellationTokenSource();
for (int i = 0; i <= partitionCount - 1; i++)
{
await Task.Factory.StartNew((state) =>
{
Console.WriteLine("Starting worker to process partition: {0}", state);
var factory = MessagingFactory.Create(ServiceBusEnvironment.CreateServiceUri
("sb", serviceNamespace, ""), new MessagingFactorySettings()
{
TokenProvider = TokenProvider.CreateSharedAccessSignatureTokenProvider(receiverKeyName, receiverKey),
TransportType = Microsoft.ServiceBus.Messaging.TransportType.Amqp
});
var receiver = factory.CreateEventHubClient(hubName)
.GetDefaultConsumerGroup()
.CreateReceiver(state.ToString(), DateTime.UtcNow);
while (true)
{
var messages = receiver.Receive(10);
foreach (var message in messages)
{
var eventBody = Newtonsoft.Json.JsonConvert.DeserializeObject
<Dictionary<string, double>>(Encoding.Default.GetString(message.GetBytes()));
Dictionary<string, EntityProperty> properties =
new Dictionary<string, EntityProperty>();
foreach (var item in eventBody.Keys)
{
properties.Add(item, new EntityProperty(eventBody[item].ToString()));
}
DynamicTableEntity dte = new DynamicTableEntity("ccno1",
DateTime.Now.ToString("yyyy-MM-ddTHH:mm:00zzz")) { Properties = properties };
CloudTable table = getSensorTable();
TableOperation insertOp = TableOperation.InsertOrMerge(dte);
table.Execute(insertOp);
}
if (cts.IsCancellationRequested)
{
Console.WriteLine("Stopping: {0}", state);
receiver.Close();
}
}
}, i);
}
await Task.Delay(1000);
}
}
Figure 5. Consuming the events and inserting them into Table Storage
This turned out to be bit challenging because a worker role behaves a bit differently (it is always in the Run loop) compared to a desktop application. So the considerations for Async tasks have to be examined very carefully. After reviewing several articles on the topic, we were able to put together a worker role that behaved in a reliable manner. The resulting table in the Table Storage is shown in Figure 6.
Figure 6. Table Storage of Event Data
The stored measurements could be displayed along with the output of an environmental simulation model against a Bing Maps backdrop using a webrole compliant with HTML5, CSS3 and JavaScript. The model is executed by another worker role, and the results are stored in Blob Storage and Table Storage. The webrole was provisioned in the Microsoft Cloud and hosted WCF services to extract data from the Cloud Storage for rapid visualization. A sample visualization of modeling results (in red) along with the display of sensor data (in blue) are shown in Figure 7.
Figure 7. Display of live sensor data using Bing Maps
Points of Interest
- Use of PowerBI for visualization
- Use of Stream Analytics to process the events in the Event Hub
- Use of HDInsight to move large datasets from the table storage in the HDInsight clusters, and then processing those datasets
- Use of Microsoft Machine Learning Studio to directly connect to the Cloud Storage using REST API, and the running the appropriate algorithms to detect anomalies in the data.