# Preparing Deep Learning Training and HPO Code in a Local Sagemaker Instance for Dockerizing

This notebook shows an example of a running Bayesian HPO and also training for a regression deep neural network written in Keras with a Tensorflow backend.

## 1- Load libraries

In [134]:
from __future__ import print_function

import os
import sys
import traceback
import json
import numpy as np
import pandas as pd
import tensorflow as tf
from sklearn.model_selection import train_test_split
from sklearn import preprocessing

import warnings
warnings.simplefilter("ignore")

from keras.callbacks import EarlyStopping
from keras.callbacks import ReduceLROnPlateau
from keras.callbacks import ModelCheckpoint
from keras.layers import Dropout, Dense
from keras.wrappers.scikit_learn import KerasRegressor
from keras.layers.normalization import BatchNormalization
from keras.models import Sequential

from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import LabelEncoder
from sklearn.model_selection import GridSearchCV
from pickle import dump

from timeit import default_timer as timer
from hyperopt import STATUS_OK
from hyperopt import hp
from hyperopt import tpe
from hyperopt import Trials
from hyperopt import fmin

## 2- Function for preparing our data

In [135]:
def data_prep(train_data):   

    if not final_training:
        skip = int(100/int(used_data_percentage))
        train_data = train_data[::skip]

    train_data = train_data.dropna()
    print(train_data.columns)

    train_data = train_data.astype('float32')
    
    train_x = train_data.drop([target], axis=1)
    train_y = train_data[target]

    train_x, val_x, train_y, val_y = train_test_split(train_x, train_y, test_size = train_validation_split)
    
    scaler = preprocessing.RobustScaler(quantile_range=(25.0, 75.0)).fit(train_x)
    dump(scaler, open(os.path.join(model_path, 'scaler.pkl'), 'wb'))
    
    train_x = scaler.transform(train_x)
    val_x = scaler.transform(val_x)

    return train_x, train_y, val_x, val_y

## 3- Function for doing a final training

In [136]:
def train_final_model(params):
    input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(training_path, channel_name))
    raw_data = [ pd.read_csv(file) for file in input_files if file.endswith('.csv')]
    raw_data = pd.concat(raw_data)
    train_x, train_y, test_x, test_y = data_prep(raw_data)
    print('data loaded')    
    start = timer()
  
    #######################################################
    model = Sequential()
    for i in range(params['num_dense_layers']-1):
        if i ==0:
            model.add(Dense(params['num_dense_nodes']['num_dense_nodes_1'], kernel_initializer='normal',input_dim = train_x.shape[1], activation='relu'))
            if batch_normalization:
                model.add(BatchNormalization())
            if include_dropout:
                model.add(Dropout(params['dropout']))
        else:
            model.add(Dense(params['num_dense_nodes']['num_dense_nodes_'+str(i+1)], kernel_initializer='normal', activation='relu'))
            if batch_normalization:
                model.add(BatchNormalization())
            if include_dropout:
                model.add(Dropout(params['dropout']))

    model.add(Dense(1, kernel_initializer='normal',activation=params['last_activation_f']))
    if batch_normalization:
        model.add(BatchNormalization())
    model.compile(loss=loss_metric, optimizer = params['optimizer'], metrics=[loss_metric])
    model.summary()

    earlyStopping = EarlyStopping(monitor= monitor_metric, patience=early_stopping_patience, verbose=0, mode='min')
    mcp_save = ModelCheckpoint('.mdl_wts.hdf5', save_best_only=True, monitor= monitor_metric, mode='min')
    reduce_lr_loss = ReduceLROnPlateau(monitor= monitor_metric, factor=0.1, patience=lr_update_patience, verbose=1, epsilon=1e-4, mode='min')

    history = model.fit(train_x, train_y,
              callbacks=[earlyStopping, mcp_save, reduce_lr_loss],
              epochs=params['nb_epochs'],
              verbose=2,
              validation_data=(test_x, test_y))

    predictions=model.predict(test_x)

    df = pd.DataFrame(columns=['Actual','Predicted'])
    df['Actual'] = test_y
    df['Predicted'] = predictions
    diff = abs(df['Actual']  - df['Predicted'])/df['Actual'] 
    q95 = diff.quantile(.95)


    ###########
    # serialize model to JSON
    model_json = model.to_json()
    with open(os.path.join(model_path, 'model.json'), "w") as json_file:
        json_file.write(model_json)
    # serialize weights to HDF5
    model.save_weights(os.path.join(model_path, 'model.h5'))
    print("Saved model to disk")
    ###########
    print('q95  {}'.format(q95))
    run_time = timer() - start


## 4- Function for doing Bayesian HPO

In [137]:
best_q95 = 10e10
def objective(params):
    """Objective function for Gradient Boosting Machine Hyperparameter Tuning"""
    
    
    input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) ]
    if len(input_files) == 0:
        raise ValueError(('There are no files in {}.\n' +
                          'This usually indicates that the channel ({}) was incorrectly specified,\n' +
                          'the data specification in S3 was incorrectly specified or the role specified\n' +
                          'does not have permission to access the data.').format(training_path, channel_name))
    raw_data = [ pd.read_csv(file) for file in input_files if file.endswith('.csv')]
    raw_data = pd.concat(raw_data)

    train_x, train_y, test_x, test_y = data_prep(raw_data)
    print('data loaded')

    global ITERATION
    print('Iteration: {}'.format(ITERATION))
    
    ITERATION += 1
    start = timer()
  
    #######################################################
    model = Sequential()

    for i in range(params['num_dense_layers']-1):
        if i ==0:
            model.add(Dense(params['num_dense_nodes']['num_dense_nodes_1'], kernel_initializer='normal',input_dim = train_x.shape[1], activation='relu'))
            if batch_normalization:
                model.add(BatchNormalization())
            if include_dropout:
                model.add(Dropout(params['dropout']))
        else:
            model.add(Dense(params['num_dense_nodes']['num_dense_nodes_'+str(i+1)], kernel_initializer='normal', activation='relu'))
            if batch_normalization:
                model.add(BatchNormalization())
            if include_dropout:
                model.add(Dropout(params['dropout']))

    model.add(Dense(1, kernel_initializer='normal',activation= params['last_activation']))
    if batch_normalization:
        model.add(BatchNormalization())
    model.compile(loss=loss_metric, optimizer = params['optimizer'], metrics=[loss_metric])
    #model.summary()

    earlyStopping = EarlyStopping(monitor= monitor_metric, patience=early_stopping_patience, verbose=0, mode='min')
    reduce_lr_loss = ReduceLROnPlateau(monitor= monitor_metric, factor=0.1, patience=lr_update_patience, verbose=1, epsilon=1e-4, mode='min')

    history = model.fit(train_x, train_y,
              callbacks=[earlyStopping, reduce_lr_loss],
              epochs=params['nb_epochs'],
              verbose=2,
              validation_data=(test_x, test_y))

    predictions=model.predict(test_x)

    df = pd.DataFrame(columns=['Actual','Predicted'])
    df['Actual'] = test_y
    df['Predicted'] = predictions
    diff = abs(df['Actual']  - df['Predicted'])/df['Actual'] 
    q95 = diff.quantile(.95)

    # Save the model if it improves on the best-found performance.
    # We use the global keyword so we update the variable outside
    # of this function.
    global best_q95
    global short_model_summary

    # If the classification accuracy of the saved model is improved ...
    if q95 < best_q95:
        ###########
        # serialize model to JSON
        model_json = model.to_json()
        with open(os.path.join(model_path, 'model.json'), "w") as json_file:
            json_file.write(model_json)
        # serialize weights to HDF5
        model.save_weights(os.path.join(model_path, 'model.h5'))
        
        stringlist = []
        model.summary(print_fn=lambda x: stringlist.append(x))
        short_model_summary = "".join(stringlist)

        print("Saved model to disk")
        ###########
        
        # Update the regression accuracy.
        best_q95 = q95
    print(100*'=')
    print(50*' ','      Iteration: \n', ITERATION)
    print('             q95:  \n{}'.format(q95))
    print('             best_q95:  \n {}'.format(best_q95))
    print(100*'=')
    # Delete the Keras model with these hyper-parameters from memory.
    del model
    
    #######################################################    

    run_time = timer() - start

    # Dictionary with information for evaluation
    return {'loss': q95,'params': params, 'iteration': ITERATION,
            'train_time': run_time, 'status': STATUS_OK}

## 5- Define  HyperParameters for Training and HPO (equivalent of section 5-D in the train script)   

In [144]:
# Here we define parameters for Final Training or HPO
final_training = True  # This flag switched between Final Training mode (True) and HPO mode (False)

if final_training: # If we are doing Final Training
    final_training = True
    target = 'PE_'
    batch_normalization = False
    include_dropout = False
    dropout_f = .2
    early_stopping_patience = 15
    train_validation_split = .15
    lr_update_patience = 7
    loss_metric = 'mae'
    monitor_metric = 'val_mean_absolute_error'
    num_layers_f = 8
    nodes = [1024,64,1024,32,32,64,512] # The number of nodes (length of "nodes" list) should be num_layers_f-1 because the last layer has 1 node and is automatically added
    nb_epochs_f = 3
    batch_size_f = 32
    optimizer_f = 'adam'
    last_activation_f = 'tanh'
       
else:  # If we are doing HPO
    final_training = False
    target = 'PE_'
    batch_normalization = False
    include_dropout = False
    dropout = [.2,.3,.5]
    early_stopping_patience = 15
    lr_update_patience = 7
    loss_metric = 'mae'
    monitor_metric = 'val_mean_absolute_error'
    used_data_percentage = 10
    train_validation_split = .15
    MAX_EVALS = 3
    randstate = 50
    num_layers_low = 1
    num_layers_high = 9
    choice_of_node_numbers = [16,32,64,128,256,512,1024,2048] # Here you can give the possible node size for layers. If you want to only have small number of nodes, remove the high values from this list. 
    nb_epochs = 3
    batch_size = [32,64,128]
    optimizer = ['adam']
    last_activation = ['tanh']  # Activation for the layer with one node. Options for this are 'linear' and 'tanh'
    

## 6- Putting above parameters in dictionaries that can be used by Training or HPO functions

In [145]:
if final_training:   # If we are doing Final Training
    parameters = {   'num_dense_layers': num_layers_f,
                'num_dense_nodes': {'num_dense_nodes_'+str(k+1): nodes[k] for k in range(num_layers_f-1)},
                 'batch_size' : batch_size_f,
                'nb_epochs' :  nb_epochs_f,
                'dropout' :  dropout_f,
                'optimizer': optimizer_f,
                'last_activation_f': last_activation_f
            }
else:    # If we are doing HPO
    space = {   'num_dense_layers': hp.choice('num_dense_layers', np.arange(num_layers_low, num_layers_high, dtype=int)),
                'num_dense_nodes': {'num_dense_nodes_'+str(k+1): hp.choice('num_dense_nodes_'+str(k+1), choice_of_node_numbers) for k in range(num_layers_high)},
                 'batch_size' : hp.choice('batch_size', batch_size),
                'nb_epochs' :  nb_epochs,
                'optimizer': hp.choice('optimizer',optimizer),
                'last_activation': hp.choice('last_activation',last_activation)
            }

    if include_dropout:
        space['dropout'] = hp.choice('dropout',dropout)


## 7- This is the main function which runs the final training or HPO

In [146]:
def train():
    print('Starting the training/HPO.')
    try:
        if final_training:
            print('Starting the final training...')
            train_final_model(parameters)
    
        else:
            print('Starting the HPO...')
            tpe_algorithm = tpe.suggest
            bayes_trials = Trials()

            # Global variable
            global  ITERATION

            ITERATION = 0
            # Run optimization
            best = fmin(fn = objective, space = space, algo = tpe.suggest, 
                        max_evals = MAX_EVALS, trials = bayes_trials, rstate = np.random.RandomState(randstate))


            print('Training is complete.')
            # Sort the trials with lowest loss (highest AUC) first
            print(100*'=')
            print('\n                 Best Model:\n')
            bayes_trials_results = sorted(bayes_trials.results, key = lambda x: x['loss'])
            
            print('Model Summary: \n\n',short_model_summary)
            print('\n\n\n')
            print(bayes_trials_results[0])
            print('\n\n\n')
            print(100*'=')
            
            print('\n                 2nd Best Model: \n')
            print(bayes_trials_results[1])
            print(100*'=')

            print('\n                 3rd Best Model: \n')
            print(bayes_trials_results[2])
            print(100*'=')
            
    except Exception as e:
        # Write out an error file. This will be returned as the failure
        # Reason in the DescribeTrainingJob result.
        trc = traceback.format_exc()
        with open(os.path.join(output_path, 'failure'), 'w') as s:
            s.write('Exception during training: ' + str(e) + '\n' + trc)
        # Printing this causes the exception to be in the training job logs
        print(
            'Exception during training: ' + str(e) + '\n' + trc,
            file=sys.stderr)
        # A non-zero exit code causes the training job to be marked as Failed.
        sys.exit(255)

## 8- Define directories for data and model artifacts (equivalent of section 8-D in the train script) 

In [147]:
training_path = 'data'
output_path = '../opt/ml/output' # You can create this outside of current directory.
model_path = '../opt/ml/model'# You can create this outside of current directory.

## 9- Run train() function

In [148]:
if __name__ == '__main__':
    train()

    # A zero exit code causes the job to be marked a Succeeded.
    sys.exit(0)

Starting the training.
Starting the final training...
Index(['P0', 'P1', 'P2', 'P3', 'P4', 'P5', 'P6', 'P7', 'P8', 'P9', 'P11',
       'P12', 'P13', 'P14', 'P16', 'P17', 'P18', 'P19', 'P20', 'P21', 'P22',
       'P23', 'P24', 'P25', 'P26', 'P27', 'AREA_RATIO_', 'SPEED_', 'PE_'],
      dtype='object')
data loaded
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_49 (Dense)             (None, 1024)              29696     
_________________________________________________________________
dense_50 (Dense)             (None, 64)                65600     
_________________________________________________________________
dense_51 (Dense)             (None, 1024)              66560     
_________________________________________________________________
dense_52 (Dense)             (None, 32)                32800     
_________________________________________________________________
dense_53 (Dense)          

SystemExit: 0

## 10- Define functions for local inference

In [154]:
# This is the file that implements a flask server to do inferences. It's the
# file that you will modify to implement the scoring for your own algorithm.
from __future__ import print_function

import os
try:
    from StringIO import StringIO ## for Python 2
except ImportError:
    from io import StringIO ## for Python 3
    
import flask
from keras.layers import Dropout, Dense
from keras.wrappers.scikit_learn import KerasRegressor
from keras.models import Sequential

import tensorflow as tf
import numpy as np
import pandas as pd
from pickle import load

#############################
from tensorflow import Graph, Session
from keras import backend as K
graph = Graph()

#############################

from keras.models import load_model
from sklearn.preprocessing import StandardScaler
from keras.models import model_from_json

import h5py
from sklearn.preprocessing import LabelEncoder, OneHotEncoder

# prefix = '/opt/ml/'
# model_path = os.path.join(prefix, 'model')

prefix = '../opt/ml/'
model_path = os.path.join(prefix, 'model')


# A singleton for holding the model. This simply loads the model and holds it.
# It has a predict function that does a prediction based on the model and the
# input data.

def loadmodel(weightFile, jsonFile):    
    # load json and create model
    json_file = open(jsonFile, 'r')
    loaded_model_json = json_file.read()
    json_file.close()
    reg = model_from_json(loaded_model_json)
    # load weights into new model
    reg.load_weights(weightFile)
    print("Loaded model from disk")
    return reg
   

class ScoringService(object):
    model = None                # Where we keep the model when it's loaded

    @classmethod
    def get_model(cls):
        """
        Get the model object for this instance,
        loading it if it's not already loaded.
        """
        if cls.model is None:
            cls.model = loadmodel(os.path.join(model_path, 'model.h5'),os.path.join(model_path, 'model.json'))
        return cls.model

    
    @classmethod
    def predict(cls,input):
        """For the input, do the predictions and return them.

        Args:
            input (a pandas dataframe): The data on which to do the
            predictions.

            There will be one prediction per row in the dataframe
        """
        sess = K.get_session()
        with sess.graph.as_default():
            clf = cls.get_model()
            return clf.predict(input)

def transform_data(dataset):
    dataset = dataset.dropna()
    dataset = dataset.astype('float32')
    scaler = load(open(os.path.join(model_path, 'scaler.pkl'), 'rb'))

    # Feature Scaling
    dataset = scaler.fit_transform(dataset)
    return pd.DataFrame(dataset)


# # The flask app for serving predictions
# app = flask.Flask(__name__)


# @app.route('/ping', methods=['GET'])
# def ping():
#     """
#     Determine if the container is working and healthy.
#     In this sample container, we declare it healthy if we can load the model
#     successfully.
#     """

#     # Health check -- You can insert a health check here
#     health = ScoringService.get_model() is not None
#     status = 200 if health else 404
#     return flask.Response(
#         response='\n',
#         status=status,
#         mimetype='application/json')
# @app.route('/invocations', methods=['POST'])


# 11- Do inference

In [155]:
def transformation():
    """
    Do an inference on a single batch of data. In this sample server, we take
    data as CSV, convert it to a pandas data frame for internal use and then
    convert the predictions back to CSV (which really just means one prediction
    per line, since there's a single column.
    """
    data = None

    # Convert from CSV to pandas
    s = '../test_mod.csv'   # MODIFIED
    data = pd.read_csv(s, header=None)
    data = transform_data(data)
    # Do the prediction
    predictions = ScoringService.predict(data)

#     # Convert from numpy back to CSV
#     out = StringIO()
#     pd.DataFrame(predictions).to_csv(out, header=False, index=False)
#     result = out.getvalue()

    return predictions   # MODIFIED
#     return result, predictions


transformation()

Loaded model from disk


array([[0.6766174 ],
       [0.5600138 ],
       [0.692883  ],
       ...,
       [0.7234832 ],
       [0.60982114],
       [0.75358176]], dtype=float32)

## 12- Inference function in docker: 

In [None]:
def transformation():
    """
    Do an inference on a single batch of data. In this sample server, we take
    data as CSV, convert it to a pandas data frame for internal use and then
    convert the predictions back to CSV (which really just means one prediction
    per line, since there's a single column.
    """
    data = None

    # Convert from CSV to pandas
    if flask.request.content_type == 'text/csv':
        data = flask.request.data.decode('utf-8')
        s = StringIO(data)
        data = pd.read_csv(s, header=None)
        data = transform_data(data)
    else:
        return flask.Response(response='This predictor only supports CSV data',status=415, mimetype='text/plain')

    print('Invoked with {} records'.format(data.shape[0]))

    # Do the prediction
    predictions = ScoringService.predict(data)

    # Convert from numpy back to CSV
    out = StringIO()
    pd.DataFrame(predictions).to_csv(out, header=False, index=False)
    result = out.getvalue()

    return flask.Response(response=result, status=200, mimetype='text/csv')


In [22]:
!pip list

Package                            Version   
---------------------------------- ----------
absl-py                            0.9.0     
alabaster                          0.7.10    
anaconda-client                    1.6.14    
anaconda-project                   0.8.2     
asn1crypto                         0.24.0    
astor                              0.8.1     
astroid                            1.6.3     
astropy                            3.0.2     
attrs                              18.1.0    
Automat                            0.3.0     
autovizwidget                      0.15.0    
awscli                             1.18.39   
Babel                              2.5.3     
backcall                           0.1.0     
backports.shutil-get-terminal-size 1.0.0     
bcrypt                             3.1.7     
beautifulsoup4                     4.6.0     
bitarray                           0.8.1     
bkcharts                           0.2       
blaze        

You should consider upgrading via the 'pip install --upgrade pip' command.[0m
