# Orchestrating Gender Prediction on SageMaker 

Amazon SageMaker provides a powerful orchestration framework that you can use to productionize any of your own machine learning algorithm, using any machine learning framework and programming languages.<p>
This is possible because, as a manager of containers, SageMaker have standarized ways interacting with your code running inside a Docker container. Since you are free to build a docker container using whatever code and depndency you like, this gives you freedom to bring your own machinery.<p>
A key take away of this workshop is the boilerplate code necessary to package your code in specific format as required by Sagemaker.<p>


Note in the beginning that we do not need to import any SageMaker specific API, or any of your machine learning library API in order to run this notebook. This is because the actual work of model training and inference generation would happen inside the docker containers, not within the Jupyter runtime.

In [None]:
import os
import time
import boto3

Let's use some parameters to uniquely identify the production pipeline, and set some hyperparameters.

In [None]:
run_type='cpu'
instance_class = "p2" if run_type.lower()=='gpu' else "c4"
instance_type = "ml.{}.8xlarge".format(instance_class)

pipeline_name = 'gender-classifier'
run='01'

run_name = pipeline_name+"-"+run

epochs = '10'

print("Using instance type - " + instance_type)

In [None]:
#Fetch name of the S3 bucket to which this Notebook instance have access to, if not mention your own bucket name

sts = boto3.client('sts')
iam = boto3.client('iam')


caller = sts.get_caller_identity()
account = caller['Account']
arn = caller['Arn']
role = arn[arn.find("/AmazonSageMaker")+1:arn.find("/SageMaker")]
timestamp = role[role.find("Role-")+5:]
policyarn = "arn:aws:iam::{}:policy/service-role/AmazonSageMaker-ExecutionPolicy-{}".format(account, timestamp)

s3bucketname = ""
policystatements = []

try:
    policy = iam.get_policy(
        PolicyArn=policyarn
    )['Policy']
    policyversion = policy['DefaultVersionId']
    policystatements = iam.get_policy_version(
        PolicyArn = policyarn, 
        VersionId = policyversion
    )['PolicyVersion']['Document']['Statement']
except Exception as e:
    s3bucketname=input("Which S3 bucket do you want to use to host training data and model? ")
    
for stmt in policystatements:
    action = ""
    actions = stmt['Action']
    for act in actions:
        if act == "s3:ListBucket":
            action = act
            break
    if action == "s3:ListBucket":
        resource = stmt['Resource'][0]
        s3bucketname = resource[resource.find(":::")+3:]

        print(s3bucketname)

## Prepare instance

One advantage of using SageMaker hosted notebooks is that, we can access the underlying instance, in the same way as we would from an ssh session, using the Jupyter magic shell command.<p>
The boilerplate code, which we affectionately call the `Dockerizer` framework, was made available on this Notebook instance by the Lifecycle Configuration that you used. Just look into the folder and ensure the necessary files are available.

In [None]:
!ls -Rl ../container

## Container structure

Notice that the artefacts obtained from the repsitory follows the structure as shown:

    <repo home>    
    |
    ├── container
        │
        ├── byoa
        |   |
        │   ├── train
        |   |
        │   ├── predictor.py
        |   |
        │   ├── serve
        |   |
        │   ├── nginx.conf
        |   |
        │   └── wsgi.py
        |
        ├── build_and_push.sh
        │   
        ├── Dockerfile.cpu
        │        
        └── Dockerfile.gpu


* `Dockerfile` describes the container image and the accompanying script `build_and_push.sh` does the heavy lifting of building the container, and uploading it into an Amazon ECR repository
* Sagemaker containers that we'll be building serves prediction request using a Flask based application. `wsgi.py` is a wrapper to invoke the Flask application, while `nginx.conf` is the configuration for the nginx front end and `serve` is the program that launches the gunicorn server. These files can be used as-is, and are required to build the webserver stack serving prediction requests, following the architecture as shown:
![Request serving stack](images/stack.png "Request serving stack")

## Training code

The file named `train` is where we need to package the code for model creation and training. We'll write code into this file using Jupyter magic command - `writefile`.

In [None]:
os.chdir('../container')
os.getcwd()

In [None]:
if run_type == "cpu":
    !cp "Dockerfile.cpu" "Dockerfile"

if run_type == "gpu":
    !cp "Dockerfile.gpu" "Dockerfile"


First part of the file would contain the necessary imports, as ususal

In [None]:
%%writefile byoa/train
#!/usr/bin/env python3

from __future__ import print_function

import os
import json
import pickle
import sys
import traceback

import numpy as np
import pandas as pd
from numpy import genfromtxt
import keras
from keras.models import Sequential
from keras.layers import Dense, Activation, Dropout
from keras.layers import LSTM
from keras.models import load_model
from sklearn.utils import shuffle

from os import listdir, sep
from os.path import abspath, basename, isdir
from sys import argv

Next we specify the paths to training data, model and hyperparameters, as visible by the code when it runs within an instantiated container

In [None]:
%%writefile -a byoa/train

# These are the paths to where SageMaker mounts interesting things in your container.

prefix = '/opt/ml/'

input_path = prefix + 'input/data'
output_path = os.path.join(prefix, 'output')
model_path = os.path.join(prefix, 'model')
param_path = os.path.join(prefix, 'input/config/hyperparameters.json')

# This algorithm has a single channel of input data called 'training'.
# Since we run in File mode, the input files are copied to the directory specified here.
channel_name='train'
training_path = os.path.join(input_path, channel_name)
if not os.path.exists(training_path):
    training_path = os.path.join(input_path, 'training')

Inside the function named `train` is where we need to provide the code for the model to train.<p>
* The code can either fetch the data directly from an S3 bucket location, or access it from the location `/opt/ml/input/data/<channel_name>`, if specified during creation of training job.
* The code can read the hyperparameters, if any specified during training job creation, from the location `/opt/ml/input/config/hyperparametrs.json`, or pick up defaults specified locally within this function

In [None]:
%%writefile -a byoa/train

# The function to execute the training.
def train():
    print('Starting the training.')
    try:
        # Read in any hyperparameters that the user passed with the training job
        with open(param_path, 'r') as tc:
            trainingParams = json.load(tc)
        print("Hyperparameters file : " + json.dumps(trainingParams))
        #Extract the supported hyperparameters
        batch_records = int(trainingParams.get('batch_size', '128'))
        num_epochs=int(trainingParams.get('num_epochs', '10'))
        dropout_ratio=float(trainingParams.get('dropout_ratio', '0.2'))
        split_ratio=float(trainingParams.get('split_ratio', '0.2'))
        sequence_size=int(trainingParams.get('sequence_size', '512'))
        activation_function=trainingParams.get('activation_function', 'sigmoid')
        loss_function=trainingParams.get('loss_function', 'categorical_crossentropy')
        optimizer_function=trainingParams.get('optimizer_function', 'adam')
        metrics_measure=trainingParams.get('metrics_measure', 'accuracy')
        print("Hyperparameters initialized")

        # Original source of training data, which the trainer would defult to if no train channel is specified
        data_filename = "https://s3.amazonaws.com/nlp-johndoe/data/name-gender.txt"
        if os.path.exists(training_path) :
            input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) ]
            if len(input_files) == 0:
                print('There are no files in {}.\nUsing default training data set available at {}'.format(training_path, data_filename))
            else:
                data_filename = input_files[0]
        else:
            print('No training folder {}.\nUsing default training data set available at {}'.format(training_path, data_filename))
        print("Loading data from : {}".format(data_filename))



Once we have the plumbing around data and hyper parameter access in place, rest of the model creation and fiting code could be just copy paste from your preparation notebook.<p>
The benefit of having a separate preparation notebook, as we followed in the previous step, is that feature formatting, model architecture, and fitment are all well tested. Therefore we don't need to tweak things around in containers, which becomes cumbersome and time-consuming. 

In [None]:
%%writefile -a byoa/train     

        #Read training data from CSV and load into a data frame
        data=pd.read_csv(data_filename, sep=',', names = ["Name", "Gender"])
        data = shuffle(data)
        print("Training data loaded")

        #number of names
        num_names = data.shape[0]

        # length of longest name
        max_name_length = (data['Name'].map(len).max())

        #Separate data and label
        names = data['Name'].values
        genders = data['Gender']

        #Determine Alphabets in the input
        names = data['Name'].values
        txt = ""
        for n in names:
            txt += n.lower()

        #Alphabet derived as an unordered set containing unique entries of all characters used in name
        chars = sorted(set(txt))
        alphabet_size = len(chars)

        #Assign index values to each symbols in Alphabet
        char_indices = dict((str(chr(c)), i) for i, c in enumerate(range(97,123)))
        alphabet_size = 123-97
        char_indices['max_name_length'] = max_name_length

        #One hot encoding to create training-X
        X = np.zeros((num_names, max_name_length, alphabet_size))
        for i,name in enumerate(names):
            name = name.lower()
            for t, char in enumerate(name):
                X[i, t,char_indices[char]] = 1

        #Encode training-Y with 'M' as 1 and 'F' as 0
        Y = np.ones((num_names,2))
        Y[data['Gender'] == 'F',0] = 0
        Y[data['Gender'] == 'M',1] = 0

        #Shape of one-hot encoded array is equal to length of longest input string by size of Alphabet
        data_dim = alphabet_size
        timesteps = max_name_length
        print("Training data prepared")

        #Consider this as a binary classification problem
        num_classes = 2

        #Initiate a sequential model
        model = Sequential()

        # Add an LSTM layer that returns a sequence of vectors of dimension sequence size (512 by default)
        model.add(LSTM(sequence_size, return_sequences=True, input_shape=(timesteps, data_dim)))

        # Drop out certain percentage (20% by default) to prevent over fitting
        if dropout_ratio > 0 and dropout_ratio < 1:
            model.add(Dropout(dropout_ratio))

        # Stack another LSTM layer that returns a single vector of dimension sequence size (512 by default)
        model.add(LSTM(sequence_size, return_sequences=False))

        # Drop out certain percentage (20% by default) to prevent over fitting
        if dropout_ratio > 0 and dropout_ratio < 1:
            model.add(Dropout(dropout_ratio))

        # Finally add an activation layer with a chosen activation function (Sigmoid by default)
        model.add(Dense(num_classes, activation=activation_function))

        # Compile the Stacked LSTM Model with a loss function (binary_crossentropy by default),
        #optimizer function (rmsprop) and a metric for measuring model effectiveness (accuracy by default)
        model.compile(loss=loss_function, optimizer=optimizer_function, metrics=[metrics_measure])
        print("Model compiled")

        # Train the model for a number of epochs (10 by default), with a batch size (1000 by default)
        # Split a portion of trainining data (20% by default) to be used a validation data
        model.fit(X, Y, validation_split=split_ratio, epochs=num_epochs, batch_size=batch_records)
        print("Model trained")

        # Save the model artifacts and character indices under /opt/ml/model
        model_type='lstm-gender-classifier'
        model.save(os.path.join(model_path,'{}-model.h5'.format(model_type)))
        char_indices['max_name_length'] = max_name_length
        np.save(os.path.join(model_path,'{}-indices.npy'.format(model_type)), char_indices)

        print('Training complete.')
    except Exception as e:
        # Write out an error file. This will be returned as the failureReason in the
        # DescribeTrainingJob result.
        trc = traceback.format_exc()
        with open(os.path.join(output_path, 'failure'), 'w') as s:
            s.write('Exception during training: ' + str(e) + '\n' + trc)
        # Printing this causes the exception to be in the training job logs, as well.
        print('Exception during training: ' + str(e) + '\n' + trc, file=sys.stderr)
        # A non-zero exit code causes the training job to be marked as Failed.
        sys.exit(255)

if __name__ == '__main__':
    train()

    # A zero exit code causes the job to be marked a Succeeded.
    sys.exit(0)

## Inference code

The file named `predictor.py` is where we need to package the code for generating inference using the trained model that was saved into an S3 bucket location by the training code during the training job run.<p>
We'll write code into this file using Jupyter magic command - `writefile`.<p><br>
First part of the file would contain the necessary imports, as ususal.    

In [None]:
%%writefile byoa/predictor.py
# This is the file that implements a flask server to do inferences. It's the file that you will modify to
# implement the scoring for your own algorithm.

from __future__ import print_function

import os
import json
import pickle
from io import StringIO
import sys
import signal
import traceback

import numpy as np

import keras
from keras.models import Sequential
from keras.layers import Dense, Dropout
from keras.layers import Embedding
from keras.layers import LSTM
from keras.models import load_model
import flask

import tensorflow as tf

import pandas as pd

from os import listdir, sep
from os.path import abspath, basename, isdir
from sys import argv


When run within an instantiated container, SageMaker makes the trained model available locally at `/opt/ml`

In [None]:
%%writefile -a byoa/predictor.py

prefix = '/opt/ml/'
model_path = os.path.join(prefix, 'model')

The machinery to produce inference is wrapped around in a Pythonic class structure, within a `Singleton` class, aptly named - `ScoringService`.<p>
We create `Class` variables in this class to hold loaded model, character indices, tensor-flow graph, and anything else that needs to be referenced while generating prediction. 

In [None]:
%%writefile -a byoa/predictor.py

# A singleton for holding the model. This simply loads the model and holds it.
# It has a predict function that does a prediction based on the model and the input data.

class ScoringService(object):
    model_type = None           # Where we keep the model type, qualified by hyperparameters used during training
    model = None                # Where we keep the model when it's loaded
    graph = None
    indices = None              # Where we keep the indices of Alphabet when it's loaded

Generally, we have to provide class methods to load the model and related artefacts from the model path as assigned by SageMaker within the running container.<p>
Notice here that SageMaker copies the artefacts from the S3 location (as defined during model creation) into the container local file system.

In [None]:
%%writefile -a byoa/predictor.py

    @classmethod
    def get_indices(cls):
        #Get the indices for Alphabet for this instance, loading it if it's not already loaded
        if cls.indices == None:
            model_type='lstm-gender-classifier'
            index_path = os.path.join(model_path, '{}-indices.npy'.format(model_type))
            if os.path.exists(index_path):
                cls.indices = np.load(index_path).item()
            else:
                print("Character Indices not found.")
        return cls.indices

    @classmethod
    def get_model(cls):
        #Get the model object for this instance, loading it if it's not already loaded
        if cls.model == None:
            model_type='lstm-gender-classifier'
            mod_path = os.path.join(model_path, '{}-model.h5'.format(model_type))
            if os.path.exists(mod_path):
                cls.model = load_model(mod_path)
                cls.model._make_predict_function()
                cls.graph = tf.get_default_graph()
            else:
                print("LSTM Model not found.")
        return cls.model
    

Finally, inside another clas method, named `predict`, we provide the code that we used earlier to generate prediction.<p>
Only difference with our previous test prediciton (in development notebook) is that in this case, the predictor will grab the data from the `input` variable, which in turn is obtained from the HTTP request payload.

In [None]:
%%writefile -a byoa/predictor.py

    @classmethod
    def predict(cls, input):

        mod = cls.get_model()
        ind = cls.get_indices()

        result = {}

        if mod == None:
            print("Model not loaded.")
        else:
            if 'max_name_length' not in ind:
                max_name_length = 15
                alphabet_size = 26
            else:
                max_name_length = ind['max_name_length']
                ind.pop('max_name_length', None)
                alphabet_size = len(ind)

            inputs_list = input.strip('\n').split(",")
            num_inputs = len(inputs_list)

            X_test = np.zeros((num_inputs, max_name_length, alphabet_size))

            for i,name in enumerate(inputs_list):
                name = name.lower().strip('\n')
                for t, char in enumerate(name):
                    if char in ind:
                        X_test[i, t,ind[char]] = 1

            with cls.graph.as_default():
                predictions = mod.predict(X_test)

            for i,name in enumerate(inputs_list):
                result[name] = 'M' if predictions[i][0]>predictions[i][1] else 'F'
                print("{} ({})".format(inputs_list[i],"M" if predictions[i][0]>predictions[i][1] else "F"))

        return json.dumps(result)

With the prediction code captured, we move on to define the flask app, and provide a `ping`, which SageMaker uses to conduct health check on container instances that are responsible behind the hosted prediction endpoint.<p>
Here we can have the container return healthy response, with status code `200` when everythings goes well.<p>
For simplicity, we are only validating whether model has been loaded in this case. In practice, this provides opportunity extensive health check (including any external dependency check), as required.

In [None]:
%%writefile -a byoa/predictor.py

# The flask app for serving predictions
app = flask.Flask(__name__)

@app.route('/ping', methods=['GET'])
def ping():
    #Determine if the container is working and healthy.
    # Declare it healthy if we can load the model successfully.
    health = ScoringService.get_model() is not None and ScoringService.get_indices() is not None
    status = 200 if health else 404
    return flask.Response(response='\n', status=status, mimetype='application/json')


Last but not the least, we define a `transformation` method that would intercept the HTTP request coming through to the SageMaker hosted endpoint.<p>
Here we have the opportunity to decide what type of data we accept with the request. In this particular example, we are accepting only `CSV` formatted data, decoding the data, and invoking prediction.<p>
The response is similarly funneled backed to the caller with MIME type of `CSV`.<p>
You are free to choose any or multiple MIME types for your requests and response. However if you choose to do so, it is within this method that we have to transform the back to and from the format that is suitable to passed for prediction.

In [None]:
%%writefile -a byoa/predictor.py


@app.route('/invocations', methods=['POST'])
def transformation():
    #Do an inference on a single batch of data
    data = None

    # Convert from CSV to pandas
    if flask.request.content_type == 'text/csv':
        data = flask.request.data.decode('utf-8')
    else:
        return flask.Response(response='This predictor only supports CSV data', status=415, mimetype='text/plain')

    print('Invoked with {} records'.format(data.count(",")+1))

    # Do the prediction
    predictions = ScoringService.predict(data)

    result = ""
    for prediction in predictions:
        result = result + prediction

    return flask.Response(response=result, status=200, mimetype='text/csv')

Note that in containerizing our custom LSTM Algorithm, where we used `Keras` as our framework of our choice, we did not have to interact directly with the SageMaker API, even though SageMaker API doesn't support `Keras`.<p>
This serves to show the power and flexibility offered by containerized machine learning pipeline on SageMaker.

## Container publishing

Of course the code written so far in this notebook haven't sttod the test of execution so far. In order to do so, we need to actually build the `Docker` containers, publish it to `Amazon ECR` repository, and then either use SageMaker console or API to run the training hosting and deployment stages.

Conceptually, the steps required for publishing are:<p>
1. Make the `train` and `predictor.py` files executable
2. Create an ECR repository within your default region
3. Build a docker container with an identifieable name (we used a combination or model name and version as unique)
4. Tage the image and publish to the ECR repository
<p><br>
All of these ar conveniently encapsulated inside `build_and_push` script. We simply run it with the unique name of our production run.

In [None]:
!sh build_and_push.sh $run_name

## Orchestraion

At this point, we can head to ECS console, grab the ARN for the repository where we published the docker image with our training and inference code, and use SageMaker console to spawn training job, create hosted model, and endpoint.<p>
However, it is often more convenient to automate these steps. This notebook shows one way to do so, using `boto3 SageMaker` API.

In [None]:
sagemaker = boto3.client('sagemaker')

First we create a training job specifying the name of our produciton pipeline, ARN of the published image on ECR, location of available training data on S3 bucket, and desired S3 location where we need the trained model to be saved.<p>
We wait until the training job completes before proceeding to the next stage.

In [None]:
region = boto3.session.Session().region_name
response = sagemaker.create_training_job(
    TrainingJobName='{}-training'.format(run_name),
    HyperParameters={
        'num_epochs': epochs
    },
    AlgorithmSpecification={
        'TrainingImage': '{}.dkr.ecr.{}.amazonaws.com/{}:latest'.format(account,region,run_name),
        'TrainingInputMode': 'File'
    },    
    RoleArn='arn:aws:iam::{}:role/service-role/AmazonSageMaker-ExecutionRole-{}'.format(account,timestamp),
    InputDataConfig=[
        {
            'ChannelName': 'train',
            'DataSource': {
                'S3DataSource': {
                    'S3DataType': 'S3Prefix',
                    'S3Uri': 's3://{}/data'.format(s3bucketname),
                    'S3DataDistributionType': 'FullyReplicated'
                }
            },
            'CompressionType': 'None',
            'RecordWrapperType': 'None'
        },
    ],
    OutputDataConfig={
        'S3OutputPath': 's3://{}/output'.format(s3bucketname)
    },
    ResourceConfig={
        'InstanceType': instance_type,
        'InstanceCount': 1,
        'VolumeSizeInGB': 10
    },
    StoppingCondition={
        'MaxRuntimeInSeconds': 86400
    },
    Tags=[
        {
            'Key': 'Name',
            'Value': '{}-training'.format(run_name)
        }
    ]    
)
status='InProgress'
step = 0
sleep = 30
print("{} - Time Elapsed: {} seconds".format(status,step*sleep))
while status != 'Completed' and status != 'Failed':
    response = sagemaker.describe_training_job(
        TrainingJobName=run_name+'-training'
    )
    status = response['TrainingJobStatus']
    time.sleep(sleep)
    step = step+1
    print("{} - Time Elapsed: {} seconds".format(status,step*sleep))

If training succeeds, we move on to create a model hosting definition, by providing the S3 location to the model artifact, and ARN to the ECR image of the container.

In [None]:
if status == 'Completed':
    response = sagemaker.create_model(
        ModelName='{}-model'.format(run_name),
        PrimaryContainer={
            'Image': '{}.dkr.ecr.{}.amazonaws.com/{}:latest'.format(account,region,run_name),
            'ModelDataUrl': 's3://{}/output/{}-training/output/model.tar.gz'.format(s3bucketname,run_name),
            'Environment': {
                'string': 'string'
            }
        },
        ExecutionRoleArn='arn:aws:iam::{}:role/service-role/AmazonSageMaker-ExecutionRole-{}'.format(account,timestamp),
        Tags=[
            {
                'Key': 'Name',
                'Value': '{}-model'.format(run_name)
            }
        ]
    )    

Using the model hosting definition, our next step is to create configuration of a hosted endpoint that will be used to serve prediciton generation requests. 

In [None]:
response = sagemaker.create_endpoint_config(
    EndpointConfigName='{}-endpoint-config'.format(run_name),
    ProductionVariants=[
        {
            'VariantName': 'default',
            'ModelName': '{}-model'.format(run_name),
            'InitialInstanceCount': 1,
            'InstanceType': instance_type,
            'InitialVariantWeight': 1
        },
    ],
    Tags=[
        {
            'Key': 'Name',
            'Value': '{}-endpoint-config'.format(run_name)
        }
    ]
)

Creating the endpoint is the last step in the ML cycle, that prepares your model to serve client reqests from applications.<p>
We wait until provisioning of infrastrucuture needed to host the endpoint is completed and the endpoint in service.

In [None]:
response = sagemaker.create_endpoint(
    EndpointName='{}-endpoint'.format(run_name),
    EndpointConfigName='{}-endpoint-config'.format(run_name),
    Tags=[
        {
            'Key': 'string',
            'Value': run_name+'-endpoint'
        }
    ]
)
status='Creating'
step = 0
sleep = 30
print("{} - Time Elapsed: {} seconds".format(status,step*sleep))
while status != 'InService' and status != 'Failed' and status != 'OutOfService':
    response = sagemaker.describe_endpoint(
        EndpointName='{}-endpoint'.format(run_name)
    )
    status = response['EndpointStatus']
    time.sleep(sleep)
    step = step+1
    print("{} - Time Elapsed: {} seconds".format(status,step*sleep))

At the end we run a quick test to validate we are able to generate same predicitions as we did in our preparation notebook.

In [None]:
!aws sagemaker-runtime invoke-endpoint --endpoint-name "$run_name-endpoint" --body 'Tom,Allie,Jim,Sophie,John,Kayla,Mike,Amanda,Andrew' --content-type text/csv outfile
!cat outfile

Head back to Module-3 of the workshop now, to the section titled - `Integration`, and follow the steps described.<p>
You'll need to copy the endpoint name from the output of the cell below, to use in the Lambda function that will send request to this hosted endpoint.

In [None]:
print(response['EndpointName'])