Deploying a machine learning model remotely to the edge with Microsoft Azure* IoT Edge can help scale an IoT application. Azure IoT Edge works by containerizing the solution into a module to be pushed down to machines at the edge. This paper will convert a Python* based motor defect detector solution to a deployable Azure module. The motor defect detector uses a K-means clustering algorithm to do predictive maintenance on motor bearings to determine if they will fail or not. The module does analysis on simulated data from the motors and then sends messages back to the IOT hub about the status of each bearing.
Figure 1: Flow of module from Development Machine to Edge Device
Hardware
Setting up Development Environment and Microsoft Azure*
Follow the Python module tutorial here to make sure all prerequisites are setup. It will walk through setting up Visual Studio Code and other resources on the development machine as well as creating all the necessary dependencies in Azure. The main things in Azure to have are a standard tier IoT Hub, the edge device registered to the hub, and a registry to store the container images.
Tips
Cookie Cutter needs to be added as an environment variable to the ‘Path’ on the development machine. It is installed at the location below on the development machine used.
C:\Users\\AppData\Roaming\Python\Python36\Scripts\
The development machine should use Python3 and also needs the following things installed so Visual Studio Code doesn’t show errors on the code in the tutorial:
pip install iothub_client pandas numpy sklearn scipy
Pylint might also needs to be installed if not already for Visual Studio Code.
pip install pylint
Restart Visual Studio code so it can find the installations.
Creating the Module
On the development machine we will use Visual Studio Code to create the module to be deployed to the Tank edge device.
- Create a "New IoT Edge Solution" by right clicking on modules.
- Call it MotorDefectDetectorSolution.
- Select Python Module and call the module KmeansModule.
- Enter in the registry address: .azurecr.io/KmeansModule
- The new Edge Solution will open.
- Copy in kmeanModel.npy from the motor defect detector GitHub* to the KmeansModule. This is the model file.
- Create and copy in utils.py from below. Utils.py handles most of the mathematical calculations. It has been edited from the github utils.py by changing it to use the 1st test set by default and removing the unused plotting functions.
#importing the libraries
import numpy as np
import pandas as pd
#import matplotlib.pyplot as plt
from scipy.fftpack import fft
from scipy.spatial.distance import cdist
#from sklearn import cluster
#cal_labels function take no_of_files as input and generate the label based on 70-30 split.
#files for the testset1 = 2148,testset2 = 984,testset3 = 6324
def cal_Labels(files):
range_low = files*0.7
range_high = files*1.0
label = []
for i in range(0,files):
if(i= range_low and i <= range_high):
label.append(1)
else:
label.append(2)
return label
# cal_amplitude take the fftdata, n = no of maximun amplitude as input and return the top5 frequecy which has the highest amplitude
def cal_amplitude(fftData,n):
ifa = []
ia = []
amp = abs(fftData[0:int(len(fftData)/2)])
freq = np.linspace(0,10000,num = int(len(fftData)/2))
ida = np.array(amp).argsort()[-n:][::-1]
ia.append([amp[i] for i in ida])
ifa.append([freq[i] for i in ida])
return(ifa,ia)
# this function calculate the top n freq which has the heighest amplitude and retuen the list for each maximum
def cal_max_freq(files,path):
freq_max1, freq_max2, freq_max3, freq_max4, freq_max5 = ([] for _ in range(5))
for f in files:
temp = pd.read_csv(path+f, sep = "\t",header = None)
temp_freq_max1,temp_freq_max2,temp_freq_max3,temp_freq_max4,temp_freq_max5 = ([] for _ in range(5))
rhigh = 8
for i in range(0,rhigh):
t = fft(temp[i])
ff,aa = cal_amplitude(t,5)
temp_freq_max1.append(np.array(ff)[:,0])
temp_freq_max2.append(np.array(ff)[:,1])
temp_freq_max3.append(np.array(ff)[:,2])
temp_freq_max4.append(np.array(ff)[:,3])
temp_freq_max5.append(np.array(ff)[:,4])
freq_max1.append(temp_freq_max1)
freq_max2.append(temp_freq_max2)
freq_max3.append(temp_freq_max3)
freq_max4.append(temp_freq_max4)
freq_max5.append(temp_freq_max5)
return(freq_max1,freq_max2,freq_max3,freq_max4,freq_max5)
def create_dataframe(freq_max1,freq_max2,freq_max3,freq_max4,freq_max5,bearing):
result = pd.DataFrame()
result['fmax1'] = list((np.array(freq_max1))[:,bearing])
result['fmax2'] = list((np.array(freq_max2))[:,bearing])
result['fmax3'] = list((np.array(freq_max3))[:,bearing])
result['fmax4'] = list((np.array(freq_max4))[:,bearing])
result['fmax5'] = list((np.array(freq_max5))[:,bearing])
x = result[["fmax1","fmax2","fmax3","fmax4","fmax5"]]
return x
Code 1: utils.py
- Copy in the code from main.py below into the default main.py. Main is where the program is going to run and send messages to topic about the status of the bearings. In order to simulate data being generated, the script will download the data sets from NASA used in the original GitHub project and extract the 1st test set. Then it will copy the 1st test set files one by one to folder /tmp/test. This folder is where the program will pull the data from, hence simulating the motor running and gathering data over time.
import random
import time
import sys
import iothub_client
# pylint: disable=E0611
from iothub_client import IoTHubModuleClient, IoTHubClientError, IoTHubTransportProvider
from iothub_client import IoTHubMessage, IoTHubMessageDispositionResult, IoTHubError
import pandas as pd
import numpy as np
from utils import cal_max_freq
import os
import urllib
import shutil
def checkBearings(hubManager):
datadir= '/tmp/1st_test/'
filedir = '/tmp/test/'
try:
if not os.path.exists(datadir):
os.system("mkdir /tmp/test")
print("data not found7, downloading")
urllib.request.urlretrieve("https://ti.arc.nasa.gov/c/3/", "/tmp/IMS.7z")
print("downloaded, now unzipping")
os.system("7za x /tmp/IMS.7z -o/tmp/")
os.system("unrar x /tmp/1st_test.rar /tmp/")
print("unzipped")
files = [x for x in os.listdir(datadir)]
oldest= min(os.listdir(datadir), key=lambda f: os.path.getctime("{}/{}".format(datadir, f)))
os.rename(datadir + oldest, filedir + oldest)
except IOError as e:
print(e)
print("error end")
# load the model
filename = "kmeanModel.npy"
model = np.load(filename).item()
# iteration for 1st_test data
rhigh = 8
moredata= True
while moredata:
try:
# load the files
all_files = os.listdir(filedir)
freq_max1,freq_max2,freq_max3,freq_max4,freq_max5 = cal_max_freq(all_files,filedir)
except IOError as e:
print("you have entered either the wrong data directory path or filepath ")
print(e)
print("error end")
#testlabels = []
for i in range(0,rhigh):
print("checking for the bearing",i+1)
result = pd.DataFrame()
result['freq_max1'] = list((np.array(freq_max1))[:,i])
result['freq_max2'] = list((np.array(freq_max2))[:,i])
result['freq_max3'] = list((np.array(freq_max3))[:,i])
result['freq_max4'] = list((np.array(freq_max4))[:,i])
result['freq_max5'] = list((np.array(freq_max5))[:,i])
X = result[["freq_max1","freq_max2","freq_max3","freq_max4","freq_max5"]]
label = model.predict(X)
labelfive = list(label[-100:]).count(5)
labelsix = list(label[-100:]).count(6)
labelseven = list(label[-100:]).count(7)
totalfailur = labelfive+labelsix+labelseven#+labelfour
ratio = (totalfailur/100)*100
if(ratio >= 25):
print("bearing"+ str(i+1) + " is suspected to fail")
hubManager.send_event_to_output("output2", "bearing"+ str(i+1) + " is suspected to fail", 0)
else:
print("bearing"+ str(i+1) + " is working in normal condition")
hubManager.send_event_to_output("output2", "bearing"+ str(i+1) + " is working in normal condition", 0)
files = [x for x in os.listdir(datadir)]
if len(files):
oldest= min(os.listdir(datadir), key=lambda f: os.path.getctime("{}/{}".format(datadir, f)))
os.rename(datadir + oldest, filedir + oldest)
else:
moredata = False
print("done")
# messageTimeout - the maximum time in milliseconds until a message times out.
# The timeout period starts at IoTHubModuleClient.send_event_async.
# By default, messages do not expire.
MESSAGE_TIMEOUT = 10000
# global counters
RECEIVE_CALLBACKS = 0
SEND_CALLBACKS = 0
# Choose HTTP, AMQP or MQTT as transport protocol. Currently only MQTT is supported.
PROTOCOL = IoTHubTransportProvider.MQTT
# Callback received when the message that we're forwarding is processed.
def send_confirmation_callback(message, result, user_context):
global SEND_CALLBACKS
print ( "Confirmation[%d] received for message with result = %s" % (user_context, result) )
map_properties = message.properties()
key_value_pair = map_properties.get_internals()
print ( " Properties: %s" % key_value_pair )
SEND_CALLBACKS += 1
print ( " Total calls confirmed: %d" % SEND_CALLBACKS )
# receive_message_callback is invoked when an incoming message arrives on the specified
# input queue (in the case of this sample, "input1"). Because this is a filter module,
# we will forward this message onto the "output1" queue.
def receive_message_callback(message, hubManager):
global RECEIVE_CALLBACKS
message_buffer = message.get_bytearray()
size = len(message_buffer)
#print ( " Data: <<<%s>>> & Size=%d" % (message_buffer[:size].decode('utf-8'), size) )
map_properties = message.properties()
key_value_pair = map_properties.get_internals()
#print ( " Properties: %s" % key_value_pair )
RECEIVE_CALLBACKS += 1
#print ( " Total calls received: %d" % RECEIVE_CALLBACKS )
#hubManager.forward_event_to_output("output1", message, 0)
return IoTHubMessageDispositionResult.ACCEPTED
class HubManager(object):
def __init__(
self,
protocol=IoTHubTransportProvider.MQTT):
self.client_protocol = protocol
self.client = IoTHubModuleClient()
self.client.create_from_environment(protocol)
# set the time until a message times out
self.client.set_option("messageTimeout", MESSAGE_TIMEOUT)
# sets the callback when a message arrives on "input1" queue. Messages sent to
# other inputs or to the default will be silently discarded.
self.client.set_message_callback("input1", receive_message_callback, self)
# Forwards the message received onto the next stage in the process.
def forward_event_to_output(self, outputQueueName, event, send_context):
self.client.send_event_async(
outputQueueName, event, send_confirmation_callback, send_context)
# Send the message
def send_event_to_output(self, outputQueueName, messsage, send_context):
event=IoTHubMessage(messsage)
self.client.send_event_async(
outputQueueName, event, send_confirmation_callback, send_context)
def main(protocol):
try:
print ( "\nPython %s\n" % sys.version )
print ( "IoT Hub Client for Python3" )
hub_manager = HubManager(protocol)
print ( "Starting the IoT Hub Python sample using protocol %s..." % hub_manager.client_protocol )
print ( "The sample is now waiting for messages and will indefinitely. Press Ctrl-C to exit. ")
checkBearings(hub_manager)
while True:
time.sleep(1)
except IoTHubError as iothub_error:
print ( "Unexpected error %s from IoTHub" % iothub_error )
return
except KeyboardInterrupt:
print ( "IoTHubModuleClient sample stopped" )
if __name__ == '__main__':
main(PROTOCOL)
Code 2: main.py
- Update the requirements.txt. This will install the dependencies of the Motor Fault Detector.
azure-iothub-device-client==1.4.0
numpy>=1.11.2
scipy>=1.1.0
pandas>=0.23.4
scikit-learn>=0.19.1
sklearn>=0.0
Code 3: requirements.txt
- And update Dockerfile.amd64. Note that the container only comes with Python 2.7 by default so Python3 needs to be installed and the Python path updated.
FROM ubuntu:xenial
WORKDIR /app
RUN apt-get update && \
apt-get install -y --no-install-recommends libcurl4-openssl-dev libboost-python-dev p7zip-full unrar python3-pip python3-dev python3-setuptools && \
cd /usr/local/bin && \
ln -s /usr/bin/python3 python && \
pip3 install --upgrade pip && \
rm -rf /var/lib/apt/lists
Code 4: Dockerfile.amd64
With the added files, the module structure should look as below in Visual Studio Code.
Figure 2: IoT Edge Solution in Visual Studio Code
Deploying the Module
1. In Visual Studio Code, right click on deployment.template.solution and Build and Push IoT Edge Solution.
Figure 3: Build and Push IoT Edge Solution location
It will take some time to build the container with all the requirements.
2. Then right click on the Azure IoT Hub Device you want to deploy to and select Create Deployment for Single Device.
Figure 4: Deploy the module
3. Log into the Iot Hub edge Device, in this case, the Tank.
Use the below command to monitor the progress:
sudo iotedge logs KmeansModule –f
It will take some time to download the data and extract it. The module will first download the data, extract the data, and then start copying it into the folder. It will then send the messages back to the IoT hub.
The messages can be seen on the development machine by right clicking on the … next to Azure IOT HUB Devices and selecting Start Monitoring D2C Message in Visual Studio Code.
Figure 5: Monitoring the Device to Cloud messages
Figure 6: Iot Hub Messages
Useful module commands on the Tank
List the module installed:
iotedge list
Remove the container:
sudo docker rm -f KmeansModule
Look at the logs of the container:
sudo iotedge logs KmeansModule –f
Conclusion
Now the motor defect detector Python project has been converted into a module on the Azure Iot Hub and deployed to an edge device. As a next step, Azure can turn those messages into actionable events with routing.
About the author
Whitney Foster is a software engineer at Intel in the Core and Visual Computing Group working on scale enabling projects for Internet of Things and Computer Vision.
Learn More