# flake8: noqa # This is the file that implements a flask server to do inferences. It's the file that you will modify to # implement the scoring for your own algorithm. from __future__ import print_function import os import sys sys.path.append(os.path.join(os.path.dirname(__file__), "lib")) import sagemaker_ssh_helper sagemaker_ssh_helper.setup_and_start_ssh() # <-- NEW import io import json import os import pickle # nosec B403 # sample code as is import signal import sys import traceback import flask import pandas as pd prefix = "/opt/ml/" model_path = os.path.join(prefix, "model") # A singleton for holding the model. This simply loads the model and holds it. # It has a predict function that does a prediction based on the model and the input data. class ScoringService(object): model = None # Where we keep the model when it's loaded @classmethod def get_model(cls): """Get the model object for this instance, loading it if it's not already loaded.""" if cls.model == None: with open(os.path.join(model_path, "decision-tree-model.pkl"), "rb") as inp: cls.model = pickle.load(inp) # nosec B301 # sample code as is return cls.model @classmethod def predict(cls, input): """For the input, do the predictions and return them. Args: input (a pandas dataframe): The data on which to do the predictions. There will be one prediction per row in the dataframe""" clf = cls.get_model() return clf.predict(input) # The flask app for serving predictions app = flask.Flask(__name__) @app.route("/ping", methods=["GET"]) def ping(): """Determine if the container is working and healthy. In this sample container, we declare it healthy if we can load the model successfully.""" health = ScoringService.get_model() is not None # You can insert a health check here status = 200 if health else 404 return flask.Response(response="\n", status=status, mimetype="application/json") @app.route("/invocations", methods=["POST"]) def transformation(): """Do an inference on a single batch of data. In this sample server, we take data as CSV, convert it to a pandas data frame for internal use and then convert the predictions back to CSV (which really just means one prediction per line, since there's a single column. """ data = None # Convert from CSV to pandas if flask.request.content_type == "text/csv": data = flask.request.data.decode("utf-8") s = io.StringIO(data) data = pd.read_csv(s, header=None) else: return flask.Response( response="This predictor only supports CSV data", status=415, mimetype="text/plain" ) print("Invoked with {} records".format(data.shape[0])) # Do the prediction predictions = ScoringService.predict(data) # Convert from numpy back to CSV out = io.StringIO() pd.DataFrame({"results": predictions}).to_csv(out, header=False, index=False) result = out.getvalue() return flask.Response(response=result, status=200, mimetype="text/csv")