#!/usr/bin/env python ######################################################################################################## ###################### Section 1- Load libraries ############################## ######################################################################################################## from __future__ import print_function import os import sys import traceback import json import warnings warnings.simplefilter("ignore") from keras.callbacks import EarlyStopping from keras.callbacks import ReduceLROnPlateau from keras.callbacks import ModelCheckpoint import numpy as np import pandas as pd import tensorflow as tf from sklearn.model_selection import train_test_split from sklearn import preprocessing from keras.layers import Dropout, Dense from keras.wrappers.scikit_learn import KerasRegressor from keras.layers.normalization import BatchNormalization from keras.models import Sequential from sklearn.preprocessing import StandardScaler from sklearn.preprocessing import LabelEncoder from sklearn.model_selection import GridSearchCV from pickle import dump from timeit import default_timer as timer from hyperopt import STATUS_OK from hyperopt import hp from hyperopt import tpe# Algorithm from hyperopt import Trials# Trials object to track progress from hyperopt import fmin ######################################################################################################## # Section 8-D: Define directories for data and model artifacts (equivalent of section 8 in the notebook) ######################################################################################################## # Optional os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # These are the paths to where SageMaker mounts interesting things in your # container. prefix = '/opt/ml/' input_path = prefix + 'input/data' param_path = os.path.join(prefix, 'input/config/hyperparameters.json') output_path = os.path.join(prefix, 'output') model_path = os.path.join(prefix, 'model') # This algorithm has a single channel of input data called 'training'. # Since we run in File mode, the input files are copied to the directory # specified here. channel_name = 'training' training_path = os.path.join(input_path, channel_name) ######################################################################################################## ### Section 5-D: Define HyperParameters for Training and HPO (equivalent of section 5 in the notebook) ######################################################################################################## # Read in any hyperparameters that the user passed with the training job with open(param_path, 'r') as tc: trainingParams = json.load(tc) # Hyperparameters are always passed in as strings, so we need to do any necessary conversions. final_training = trainingParams.get('final_training', None) final_training = eval(final_training) target = trainingParams.get('target', None) # Arbitrary string if target is None: target = 'PE_' batch_normalization = trainingParams.get('batch_normalization', None) # Default should be False if batch_normalization is not None: batch_normalization = eval(batch_normalization) else: batch_normalization = False include_dropout = trainingParams.get('include_dropout', None) # Default should be False if include_dropout is not None: include_dropout = eval(include_dropout) else: include_dropout = False loss_metric = trainingParams.get('loss_metric', None) # single string (not arbitrary) options are "mse" and "mae". Default: mae if loss_metric is None: loss_metric ='mae' monitor_metric = trainingParams.get('monitor_metric', None) # single string (not arbitrary) options are "val_mean_absolute_error","val_mean_squared_error", "val_loss", "mean_absolute_error","mean_squared_error", "loss" if monitor_metric is None: monitor_metric ='val_mean_absolute_error' lr_update_patience = trainingParams.get('lr_update_patience', None) # Learning rate update patience. An arbitrary integre. Default: 7 if lr_update_patience is not None: lr_update_patience = int(lr_update_patience) else: lr_update_patience = 7 early_stopping_patience = trainingParams.get('early_stopping_patience', None) # Early stopping patience. An arbitrary integre. Default: 15 if early_stopping_patience is not None: early_stopping_patience = int(early_stopping_patience) else: early_stopping_patience = 15 dropout = trainingParams.get('dropout', None) # A list of floats between [0,1]. Simiular to choice_of_node_numbers below (with floats instead of integers) if dropout is not None: dropout = eval(dropout) else: dropout = [.2,.5] dropout_f = trainingParams.get('dropout_f', None)# A floats between [0,1] exactly like train_validation_split below if dropout_f is not None: dropout_f = float(dropout_f) else: dropout_f = .2 train_validation_split = trainingParams.get('train_validation_split', None) if train_validation_split is not None: train_validation_split = float(train_validation_split) else: train_validation_split = .15 used_data_percentage = trainingParams.get('used_data_percentage', None) if used_data_percentage is not None: used_data_percentage = int(used_data_percentage) else: used_data_percentage =10 choice_of_node_numbers = trainingParams.get('choice_of_node_numbers', None) if choice_of_node_numbers is not None: choice_of_node_numbers = eval(choice_of_node_numbers) else: choice_of_node_numbers = [16,32,64,128,256,512,1024,2048] batch_size = trainingParams.get('batch_size', None) if batch_size is not None: batch_size = eval(batch_size) else: batch_size = [32] MAX_EVALS = trainingParams.get('MAX_EVALS', None) if MAX_EVALS is not None: MAX_EVALS = int(MAX_EVALS) else: MAX_EVALS = 10 randstate = trainingParams.get('randstate', None) if randstate is not None: randstate = int(randstate) else: randstate = 50 num_layers_low = trainingParams.get('num_layers_low', None) if num_layers_low is not None: num_layers_low = int(num_layers_low) else: num_layers_low =2 num_layers_high = trainingParams.get('num_layers_high', None) if num_layers_high is not None: num_layers_high = int(num_layers_high) else: num_layers_high =3 nb_epochs = trainingParams.get('nb_epochs', None) if nb_epochs is not None: nb_epochs = int(nb_epochs) else: nb_epochs = 5 optimizer = trainingParams.get('optimizer', None) if optimizer is not None: optimizer = eval(optimizer) else: optimizer =['adam'] last_activation = trainingParams.get('last_activation', None) if last_activation is not None: last_activation = eval(last_activation) else: last_activation = ['tanh'] num_layers_f = trainingParams.get('num_layers_f', None) if num_layers_f is not None: num_layers_f = int(num_layers_f) else: num_layers_f = 8 nb_epochs_f = trainingParams.get('nb_epochs_f', None) if nb_epochs_f is not None: nb_epochs_f = int(nb_epochs_f) else: nb_epochs_f = 100 batch_size_f = trainingParams.get('batch_size_f', None) if batch_size_f is not None: batch_size_f = int(batch_size_f) else: batch_size_f = 32 optimizer_f = trainingParams.get('optimizer_f', None) if optimizer_f is None: optimizer_f ='adam' last_activation_f = trainingParams.get('last_activation_f', None) if last_activation_f is None: last_activation_f ='tanh' nodes = trainingParams.get('nodes', None) if nodes is not None: nodes = eval(nodes) else: nodes = [1024,524,256,128,64,32,16] ######################################################################################################## ################ Section 2- Function for preparing our data ############################## ######################################################################################################## def data_prep(train_data): if not final_training: skip = int(100/int(used_data_percentage)) train_data = train_data[::skip] train_data = train_data.dropna() print(train_data.columns) train_data = train_data.astype('float32') train_x = train_data.drop([target], axis=1) train_y = train_data[target] train_x, val_x, train_y, val_y = train_test_split(train_x, train_y, test_size = train_validation_split) scaler = preprocessing.RobustScaler(quantile_range=(25.0, 75.0)).fit(train_x) dump(scaler, open(os.path.join(model_path, 'scaler.pkl'), 'wb')) train_x = scaler.transform(train_x) val_x = scaler.transform(val_x) return train_x, train_y, val_x, val_y ######################################################################################################## ################ Section 3- Function for doing a final training ############################## ######################################################################################################## def train_final_model(params): input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) ] if len(input_files) == 0: raise ValueError(('There are no files in {}.\n' + 'This usually indicates that the channel ({}) was incorrectly specified,\n' + 'the data specification in S3 was incorrectly specified or the role specified\n' + 'does not have permission to access the data.').format(training_path, channel_name)) raw_data = [ pd.read_csv(file) for file in input_files if file.endswith('.csv')] raw_data = pd.concat(raw_data) train_x, train_y, test_x, test_y = data_prep(raw_data) print('data loaded') start = timer() ####################################################### model = Sequential() for i in range(params['num_dense_layers']-1): if i ==0: model.add(Dense(params['num_dense_nodes']['num_dense_nodes_1'], kernel_initializer='normal',input_dim = train_x.shape[1], activation='relu')) if batch_normalization: model.add(BatchNormalization()) if include_dropout: model.add(Dropout(params['dropout'])) else: model.add(Dense(params['num_dense_nodes']['num_dense_nodes_'+str(i+1)], kernel_initializer='normal', activation='relu')) if batch_normalization: model.add(BatchNormalization()) if include_dropout: model.add(Dropout(params['dropout'])) model.add(Dense(1, kernel_initializer='normal',activation=params['last_activation_f'])) if batch_normalization: model.add(BatchNormalization()) model.compile(loss=loss_metric, optimizer = params['optimizer'], metrics=[loss_metric]) model.summary() earlyStopping = EarlyStopping(monitor= monitor_metric, patience=early_stopping_patience, verbose=0, mode='min') mcp_save = ModelCheckpoint('.mdl_wts.hdf5', save_best_only=True, monitor= monitor_metric, mode='min') reduce_lr_loss = ReduceLROnPlateau(monitor= monitor_metric, factor=0.1, patience=lr_update_patience, verbose=1, epsilon=1e-4, mode='min') history = model.fit(train_x, train_y, callbacks=[earlyStopping, mcp_save, reduce_lr_loss], epochs=params['nb_epochs'], verbose=2, validation_data=(test_x, test_y)) predictions=model.predict(test_x) df = pd.DataFrame(columns=['Actual','Predicted']) df['Actual'] = test_y df['Predicted'] = predictions diff = abs(df['Actual'] - df['Predicted'])/df['Actual'] q95 = diff.quantile(.95) ########### # serialize model to JSON model_json = model.to_json() with open(os.path.join(model_path, 'model.json'), "w") as json_file: json_file.write(model_json) # serialize weights to HDF5 model.save_weights(os.path.join(model_path, 'model.h5')) print("Saved model to disk") ########### print('q95 {}'.format(q95)) run_time = timer() - start ######################################################################################################## ################ Section 4- Function for doing Bayesian HPO ############################## ######################################################################################################## best_q95 = 10e10 def objective(params): """Objective function for Gradient Boosting Machine Hyperparameter Tuning""" input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) ] if len(input_files) == 0: raise ValueError(('There are no files in {}.\n' + 'This usually indicates that the channel ({}) was incorrectly specified,\n' + 'the data specification in S3 was incorrectly specified or the role specified\n' + 'does not have permission to access the data.').format(training_path, channel_name)) raw_data = [ pd.read_csv(file) for file in input_files if file.endswith('.csv')] raw_data = pd.concat(raw_data) train_x, train_y, test_x, test_y = data_prep(raw_data) print('data loaded') global ITERATION print('Iteration: {}'.format(ITERATION)) ITERATION += 1 start = timer() ####################################################### model = Sequential() for i in range(params['num_dense_layers']-1): if i ==0: model.add(Dense(params['num_dense_nodes']['num_dense_nodes_1'], kernel_initializer='normal',input_dim = train_x.shape[1], activation='relu')) if batch_normalization: model.add(BatchNormalization()) if include_dropout: model.add(Dropout(params['dropout'])) else: model.add(Dense(params['num_dense_nodes']['num_dense_nodes_'+str(i+1)], kernel_initializer='normal', activation='relu')) if batch_normalization: model.add(BatchNormalization()) if include_dropout: model.add(Dropout(params['dropout'])) model.add(Dense(1, kernel_initializer='normal',activation= params['last_activation'])) if batch_normalization: model.add(BatchNormalization()) model.compile(loss=loss_metric, optimizer = params['optimizer'], metrics=[loss_metric]) #model.summary() earlyStopping = EarlyStopping(monitor= monitor_metric, patience=early_stopping_patience, verbose=0, mode='min') reduce_lr_loss = ReduceLROnPlateau(monitor= monitor_metric, factor=0.1, patience=lr_update_patience, verbose=1, epsilon=1e-4, mode='min') history = model.fit(train_x, train_y, callbacks=[earlyStopping, reduce_lr_loss], epochs=params['nb_epochs'], verbose=2, validation_data=(test_x, test_y)) predictions=model.predict(test_x) df = pd.DataFrame(columns=['Actual','Predicted']) df['Actual'] = test_y df['Predicted'] = predictions diff = abs(df['Actual'] - df['Predicted'])/df['Actual'] q95 = diff.quantile(.95) # Save the model if it improves on the best-found performance. # We use the global keyword so we update the variable outside # of this function. global best_q95 global short_model_summary # If the classification accuracy of the saved model is improved ... if q95 < best_q95: ########### # serialize model to JSON model_json = model.to_json() with open(os.path.join(model_path, 'model.json'), "w") as json_file: json_file.write(model_json) # serialize weights to HDF5 model.save_weights(os.path.join(model_path, 'model.h5')) stringlist = [] model.summary(print_fn=lambda x: stringlist.append(x)) short_model_summary = "".join(stringlist) print("Saved model to disk") ########### # Update the regression accuracy. best_q95 = q95 print(100*'=') print(50*' ',' Iteration: \n', ITERATION) print(' q95: \n{}'.format(q95)) print(' best_q95: \n {}'.format(best_q95)) print(100*'=') # Delete the Keras model with these hyper-parameters from memory. del model ####################################################### run_time = timer() - start # Dictionary with information for evaluation return {'loss': q95,'params': params, 'iteration': ITERATION, 'train_time': run_time, 'status': STATUS_OK} ######################################################################################################## # Section 6- Putting hyperparameters in dictionaries that can be used by Training or HPO functions ######################################################################################################## if final_training: # Final parameters = { 'num_dense_layers': num_layers_f, 'num_dense_nodes': {'num_dense_nodes_'+str(k+1): nodes[k] for k in range(num_layers_f-1)}, 'batch_size' : batch_size_f, 'nb_epochs' : nb_epochs_f, 'dropout' : dropout_f, 'optimizer': optimizer_f, 'last_activation_f': last_activation_f } else: # HPO space = { 'num_dense_layers': hp.choice('num_dense_layers', np.arange(num_layers_low, num_layers_high, dtype=int)), 'num_dense_nodes': {'num_dense_nodes_'+str(k+1): hp.choice('num_dense_nodes_'+str(k+1), choice_of_node_numbers) for k in range(num_layers_high)}, 'batch_size' : hp.choice('batch_size', batch_size), 'nb_epochs' : nb_epochs, 'optimizer': hp.choice('optimizer',optimizer), 'last_activation': hp.choice('last_activation',last_activation) } if include_dropout: space['dropout'] = hp.choice('dropout',dropout) ######################################################################################################## # Section 7- This is the main function which runs the final training or HPO ######################################################################################################## def train(): print('Starting the training.') try: if final_training: print('Starting the final training...') train_final_model(parameters) else: print('Starting the HPO...') tpe_algorithm = tpe.suggest bayes_trials = Trials() # Global variable global ITERATION ITERATION = 0 # Run optimization best = fmin(fn = objective, space = space, algo = tpe.suggest, max_evals = MAX_EVALS, trials = bayes_trials, rstate = np.random.RandomState(randstate)) print('Training is complete.') # Sort the trials with lowest loss (highest AUC) first print(100*'=') print('\n Best Model:\n') bayes_trials_results = sorted(bayes_trials.results, key = lambda x: x['loss']) print('Model Summary: \n\n',short_model_summary) print('\n\n\n') print(bayes_trials_results[0]) print('\n\n\n') print(100*'=') print('\n 2nd Best Model: \n') print(bayes_trials_results[1]) print(100*'=') print('\n 3rd Best Model: \n') print(bayes_trials_results[2]) print(100*'=') except Exception as e: # Write out an error file. This will be returned as the failure # Reason in the DescribeTrainingJob result. trc = traceback.format_exc() with open(os.path.join(output_path, 'failure'), 'w') as s: s.write('Exception during training: ' + str(e) + '\n' + trc) # Printing this causes the exception to be in the training job logs print( 'Exception during training: ' + str(e) + '\n' + trc, file=sys.stderr) # A non-zero exit code causes the training job to be marked as Failed. sys.exit(255) ######################################################################################################## ################ Section 9- Run train() function ############################## ######################################################################################################## if __name__ == '__main__': train() # A zero exit code causes the job to be marked a Succeeded. sys.exit(0)