Click here to Skip to main content
65,938 articles
CodeProject is changing. Read more.
Articles / artificial-intelligence / machine-learning

Building an MLOps Model API

5.00/5 (2 votes)
10 May 2021CPOL1 min read 7.3K   44  
In this article we build the model API to support the prediction service.
Here we build an API that will load our model from the production registry to enable the prediction service described in the Google MLOps Maturity Model.

In this series of articles, we’ll walk you through the process of applying CI/CD to the AI tasks. You’ll end up with a functional pipeline that meets the requirements of level 2 in the Google MLOps Maturity Model. We’re assuming that you have some familiarity with Python, Deep Learning, Docker, DevOps, and Flask.

In the previous article, we discussed the unit testing step in our ML CI/CD pipeline. In this one, we’ll build the model API to support the prediction service.

The diagram below shows where we are in our project process.

Image 1

And the code files’ structure is the following:

Image 2

Most of the code in this article is virtually the same as in the previous one, so we’ll only look at the differences.

Find the full code in this repository as the snippets shown below are condensed versions.

task.py

The task.py file, which orchestrates the program execution within the container, looks as follows:

Python
import tensorflow as tf
from tensorflow.keras.models import load_model
import jsonpickle
import data_utils, email_notifications
import sys
import os
from google.cloud import storage
import datetime
import numpy as np
import jsonpickle
import cv2
from flask import flash,Flask,Response,request,jsonify
import threading
import requests
import time
 
# IMPORTANT
# If you're running this container locally and you want to access the API via local browser, use http://172.17.0.2:5000/
 
# Starting flask app
app = Flask(__name__)
 
# general variables declaration
model_name = 'best_model.hdf5'
bucket_name = 'automatictrainingcicd-aiplatform'
global model
 
@app.before_first_request
def before_first_request():
 def initialize_job():
  if len(tf.config.experimental.list_physical_devices('GPU')) > 0:
   tf.config.set_soft_device_placement(True)
   tf.debugging.set_log_device_placement(True)
  global model
  # Checking if there's any model saved at testing on GCS
  model_gcs = data_utils.previous_model(bucket_name,model_name)
  # If any model exists at prod, load it, test it on data and use it on the API
  if model_gcs[0] == True:
   model_gcs = data_utils.load_model(bucket_name,model_name)
   if model_gcs[0] == True:
    try:
     model = load_model(model_name)
    except Exception as e:
     email_notifications.exception('Something went wrong trying to production model. Exception: '+str(e))
     sys.exit(1)
   else:
    email_notifications.exception('Something went wrong when trying to load production model. Exception: '+str(model_gcs[1]))
    sys.exit(1)
  if model_gcs[0] == False:
   email_notifications.send_update('There are no artifacts at model registry. Check GCP for more information.')
   sys.exit(1)
  if model_gcs[0] == None:
   email_notifications.exception('Something went wrong when trying to check if production model exists. Exception: '+model_gcs[1]+'. Aborting execution.')
   sys.exit(1)
 thread = threading.Thread(target=initialize_job)
 thread.start()
 
 
@app.route('/init', methods=['GET','POST'])
def init():
 message = {'message': 'API initialized.'}
 response = jsonpickle.encode(message)
 return Response(response=response, status=200, mimetype="application/json")
 
@app.route('/', methods=['POST'])
def index():
 if request.method=='POST':
  try:
   #Converting string that contains image to uint8
   image = np.fromstring(request.data,np.uint8)
   image = image.reshape((128,128,3))
   image = [image]
   image = np.array(image)
   image = image.astype(np.float16)
   result = model.predict(image)
   result = np.argmax(result)
   message = {'message': '{}'.format(str(result))}
   json_response = jsonify(message)
   return json_response
 
  except Exception as e:
   message = {'message': 'Error'}
   json_response = jsonify(message)
   email_notifications.exception('Something went wrong when trying to make prediction via Production API. Exception: '+str(e)+'. Aborting execution.')
   return json_response
 else:
  message = {'message': 'Error. Please use this API in a proper manner.'}
  json_response = jsonify(message)
  return json_response
 
def self_initialize():
 def initialization():
  global started
  started = False
  while started == False:
   try:
    server_response = requests.get('http://127.0.0.1:5000/init')
    if server_response.status_code == 200:
     print('API has started successfully, quitting initialization job.')
     started = True
   except:
    print('API has not started. Still attempting to initialize it.')
   time.sleep(3)
 thread = threading.Thread(target=initialization)
 thread.start()
 
if __name__ == '__main__':
 self_initialize()
 app.run(host='0.0.0.0',debug=True,threaded=True)

data_utils.py

The data_utils.py file differs from its previous version only in the part where it loads the model from the production registry. The differences are:

  • status = storage.Blob(bucket=bucket, name='{}/{}'.format('testing',model_filename)).exists(storage_client) by status = storage.Blob(bucket=bucket, name='{}/{}'.format('production',model_filename)).exists(storage_client)
  • blob1 = bucket.blob('{}/{}'.format('testing',model_filename)) by blob1 = bucket.blob('{}/{}'.format('production',model_filename))

Dockerfile

In our Dockerfile, replace

RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-UnitTesting.git

with

RUN git clone https://github.com/sergiovirahonda/AutomaticTraining-PredictionAPI.git

Once you have built and run the container locally, you should get a fully functional prediction service accessible at http://172.17.0.2:5000/ through POST requests.

Next Steps

In the next series of articles, we’ll see how to chain the individual containers together into an actual pipeline, with some help from Kubernetes, Jenkins, and Google Cloud Platform. Stay tuned!

License

This article, along with any associated source code and files, is licensed under The Code Project Open License (CPOL)