#!/usr/bin/env python3 ################################################ # Time series forecasts using # # a combo RNN architecture with # # GRU and covariates # ################################################ from __future__ import print_function import os import json import sys import traceback import numpy as np import pandas as pd from sklearn import preprocessing from keras.models import Model, load_model from keras.layers import Input, GRU, Dense, concatenate import tensorflow as tf # These are the paths where SageMaker mounts interesting things in your container. prefix = '/opt/ml/' input_path = prefix + 'input/data' output_path = os.path.join(prefix, 'output') model_path = os.path.join(prefix, 'model') param_path = os.path.join(prefix, 'input/config/hyperparameters.json') saved_param_path = os.path.join(model_path, 'hyperparameters.json') loss_history_path = os.path.join(model_path, 'loss_history.csv') # This algorithm has a single channel of input data called 'training'. # Since we run in File mode, the input files are copied to the directory specified here. channel_name='train' training_path = os.path.join(input_path, channel_name) if not os.path.exists(training_path): training_path = os.path.join(input_path, 'training') #In case the training channel it not specified, we keep provision to read the training data directly from workshop bucket. #This also demonstrates the concept that inside your custom training file, you can literally pull data from anywhere,\. #Therefore you are not limited to using data from S3 bucket only s3_bucket = 'fsv-309' s3_data_key = 'dbg-stockdata/source' # Function to normalize ML inputs. # You have to scale the inputs to limit the range of input or output values in affine transformation. def normalize_data(df): df = df.diff() # Differencing (order 1) applied because use case (stock) tends to increase linearly (non-stationary) df = df.replace(np.nan, 0) scaler = preprocessing.StandardScaler() # or: MinMaxScaler(feature_range=(0,1)) for feat in df.columns: transformed_feature = scaler.fit_transform(df.eval(feat).values.reshape(-1,1)) df[feat] = transformed_feature return df # Function to load data for training. # This function splits the data into training and test set according to the specified interval # and also creates sets of samples with number of observations equal to specified lag concatenated as X # and number of observations equal to specified horizon concatenated as Y def load_data_for_training(stock, percent_train, lag, horizon): data = stock.as_matrix() lags = [] horizons = [] nsample = len(data) - lag - horizon # Number of time series (Number of sample in 3D) for i in range(nsample): lags.append(data[i: i + lag , :]) horizons.append(data[i + lag : i + lag + horizon, -1]) lags = np.array(lags) horizons = np.array(horizons) print("Number of horizons (train + test): ", len(horizons)) row = round(percent_train * lags.shape[0]/100) # 90% split x_train = lags[:int(row), :] # 90% date, all feature lags y_train = horizons[:int(row),:] # 90% date, y horizons CHECK there is only 1 column return [x_train, y_train] def date_part(dt): return str(dt).split(' ')[0] # This is the core of training code. # This function is invoked by SageMaker, on your container, when you submit the image for training # by creating an Estimator, and fit with the specified training channel. def train(): print('Starting the training.') try: # Read in any hyperparameters that the user passed with the training job with open(param_path, 'r') as tc: trainingParams = json.load(tc) print("Hyperparameters file : " + json.dumps(trainingParams)) #Extract the supported hyperparameters #Whether to use data resampled at daily or hourly interval interval = trainingParams.get('interval', 'D').upper() assert interval == 'D' or interval == 'H' trainingParams['interval'] = interval #Lag specifies how many period's data is used as X lag=int(trainingParams.get('lag', '10')) trainingParams['lag'] = str(lag) #Horizon specifies how many period's looking ahead the model would predict horizon=int(trainingParams.get('horizon', '5')) trainingParams['horizon'] = str(horizon) #How many epochs to run num_epochs=int(trainingParams.get('num_epochs', '1000')) trainingParams['num_epochs'] = str(num_epochs) #What percentage of training data to be reserved for training percent_train=float(trainingParams.get('percent_train', '80.0')) trainingParams['percent_train'] = str(percent_train) #Number of nodes in GRU layer num_units=int(trainingParams.get('num_units', '256')) trainingParams['num_units'] = str(num_units) #Batch size specifies number of samples to process at one time batch_size=int(trainingParams.get('batch_size', '4096')) trainingParams['batch_size'] = str(batch_size) yweight=float(trainingParams.get('yweight', '0.8')) trainingParams['yweight'] = str(yweight) #Which stock symbol prediction has to be done for, specifies the time series to be used as main series target_stock = trainingParams.get('target_stock', 'BMW').upper() trainingParams['target_stock'] = target_stock #Which stock symbols can be used as covariates, specifies the time serieses to be used as exogenous series covariate_stocks = trainingParams.get('covariate_stocks', 'CON, DAI, PAH3, VOW3').upper() trainingParams['covariate_stocks'] = covariate_stocks covariates = covariate_stocks.split(',') for i, cov in enumerate(covariates): covariates[i] = cov.strip() #Which column to predict, defaultig to closing price target_column = trainingParams.get('target_column', 'EndPrice') trainingParams['target_column'] = target_column #Which columns can be used as covariates covariate_columns = trainingParams.get('covariate_columns', 'StartPrice, MinPrice, MaxPrice') trainingParams['covariate_columns'] = covariate_columns covariate_columns = covariate_columns.split(',') for i, covcol in enumerate(covariate_columns): covariate_columns[i] = covcol.strip() #Activation function activation_function = trainingParams.get('activation_function', 'elu').lower() trainingParams['activation_function'] = activation_function #Recurrent activation function recurrent_activation_function = trainingParams.get('recurrent_activation_function', 'hard_sigmoid').lower() trainingParams['recurrent_activation_function'] = recurrent_activation_function #Dense activation function dense_activation_function = trainingParams.get('dense_activation_function', 'linear').lower() trainingParams['dense_activation_function'] = dense_activation_function #Optimizer to be used optimizer = trainingParams.get('optimizer', 'adam').lower() trainingParams['optimizer'] = optimizer #Dropout ratio used to prevent overfitting dropout_ratio=float(trainingParams.get('dropout_ratio', '0.1')) trainingParams['dropout_ratio'] = str(dropout_ratio) #Loss metric to be used for optimization lossmetric = trainingParams.get('lossmetric', 'mean_absolute_error').lower() trainingParams['lossmetric'] = lossmetric #Whether to output training progress to log verbose=int(trainingParams.get('verbose', '1')) trainingParams['verbose'] = str(verbose) #Save hyperparameters to the same location as model output, #to be used during prediction, so as to be able to obtain the main and exogenous series #and lag and horizon with open(saved_param_path, 'w') as outfile: json.dump(trainingParams, outfile) print("Hyperparameters initialized") # Original source of training data, which the trainer would default to if no train channel is specified data_filename = "s3://{}/{}/{}/resampled_stockdata.csv".format(s3_bucket, s3_data_key, interval) if os.path.exists(training_path) : input_files = [ os.path.join(training_path, file) for file in os.listdir(training_path) ] if len(input_files) == 0: print('There are no files in {}. Using default training data set available at {}'.format(training_path, data_filename)) else: data_filename = input_files[0] else: print('No training folder {}. Using default training data set available at {}'.format(training_path, data_filename)) print("Loading data from : {}".format(data_filename)) #Read training data from CSV and load into a data frame print("Loading data from : {}".format(data_filename)) df = pd.read_csv(data_filename, index_col=0, parse_dates=True) print("Training data loaded") all_mnemonics = df.Mnemonic.unique() print("{} Stock symbols found.".format(len(all_mnemonics))) unique_days = sorted(list(set(map(date_part , list(df.index.unique()))))) print("Records for {} trading days found.".format(len(unique_days))) data = df[df.Mnemonic == target_stock] train_size = round(percent_train * data.shape[0]/100) test_size = data.shape[0] - train_size testsamples = [] traindata = data[:int(train_size)] testdata = data[int(train_size):] span = lag + horizon + 1 num_test_samples = int(test_size/span) num_test_samples = int(data.shape[0]*(100-percent_train)/(100*(lag + horizon + 1 ))) for i in range(0,num_test_samples): j = i + 1 start = span * i end = span * j testsamples.append(data[int(train_size) + start : int(train_size) + end]) logoutput = [] for n, covariate_stock in enumerate(covariates): if target_stock != covariate_stock: logoutput.append("{}-{}".format(n,covariate_stock)) if len(logoutput) == 13: print("\t".join(logoutput)) logoutput = [] d1 = df[df.Mnemonic == covariate_stock] traindata = traindata.append(d1[:int(train_size)]) testdata = testdata.append(d1[int(train_size):]) for i in range(0,num_test_samples): j = i + 1 start = span * i end = span * j testsamples[i] = testsamples[i].append(d1[int(train_size) + start : int(train_size) + end]) print("\t".join(logoutput)) # Save to file (training data needed during prediction when rescaling new observations) trainfile = 'traindata.csv' testfile = 'testdata.csv' traindata.to_csv(os.path.join(model_path, trainfile)) testdata.to_csv(os.path.join(model_path, testfile)) for i, testsample in enumerate(testsamples): testsamplefile = 'test{}.csv'.format(i) testsample.to_csv(os.path.join(model_path, testsamplefile)) # Main time series print('Rescaling ',target_stock) train_main = df[df.Mnemonic == target_stock].copy() train_main['TargetMetric'] = train_main[target_column] train_main.drop(['Mnemonic', target_column], 1, inplace=True) train_cols = train_main.columns.values for col in train_cols: if col != 'TargetMetric' and col not in covariate_columns: train_main.drop(col, 1, inplace=True) train_main = normalize_data(train_main) # Exogenous time series train_exo = pd.DataFrame() train_exo['CalcDateTime'] = pd.to_datetime(pd.Series(sorted(list(df.index.unique()))),infer_datetime_format=True) train_exo.index = train_exo['CalcDateTime'] train_exo.drop('CalcDateTime', axis=1, inplace = True) logoutput = [] for n, covariate_stock in enumerate(covariates): if target_stock != covariate_stock: exo = df[df.Mnemonic == covariate_stock].copy() if exo.shape[0] <= 0: print("No records for {}, skipping".format(covariate_stock)) else: logoutput.append('Rescaling {}'.format(covariate_stock)) if len(logoutput) == 7: print("\t".join(logoutput)) logoutput = [] exo['TargetMetric'] = exo[target_column] exo.drop(['Mnemonic', target_column], 1, inplace=True) train_cols = exo.columns.values for col in train_cols: if col != 'TargetMetric' and col not in covariate_columns: exo.drop(col, 1, inplace=True) exo = normalize_data(exo) exo = exo.sort_index() for col in exo.columns.values: metric_col = exo[col].to_frame() metric_col.columns = ["{}-{}".format(covariate_stock,col)] train_exo = train_exo.add(metric_col, fill_value=0) print("\t".join(logoutput)) train_exo.dropna(how='all', inplace=True) print("\n", train_main.shape, train_exo.shape) Xmain_train, ymain_train = load_data_for_training(train_main, percent_train, lag, horizon) Xexo_train, dummy = load_data_for_training(train_exo, percent_train, lag, horizon) print(Xmain_train.shape, ymain_train.shape, Xexo_train.shape) ################################################# # # # Define and train RNN deep learning # # # ################################################# # Create dynamic network based on Gated Recurrent Units (GRU) for target main_in = Input(shape=(lag, Xmain_train.shape[2]), dtype='float32', name='main_in') main_gru = GRU(units=num_units, return_sequences=False, activation=activation_function, recurrent_activation=recurrent_activation_function, dropout=dropout_ratio)(main_in) main_out = Dense(horizon, activation=dense_activation_function, name='main_out')(main_gru) # Create dynamic network based on Gated Recurrent Units (GRU) for covariates exo_in = Input(shape=(lag, Xexo_train.shape[2]), dtype='float32', name='exo_in') exo_gru = GRU(units=num_units, return_sequences=False, activation=activation_function, recurrent_activation=recurrent_activation_function, dropout=dropout_ratio, name='grulayer')(exo_in) exo_out = Dense(horizon, activation=dense_activation_function, name='exo_out')(exo_gru) # Merge the two resulting layers z = concatenate([main_gru, exo_gru]) # Create a dense layer for all merged data combo_out = Dense(horizon, activation=activation_function, name='combo_out')(z) # Define final model input / output flows, compile parameters xyweight = 1 - yweight # hyperparameters model = Model(inputs=[main_in, exo_in], outputs=[main_out, combo_out]) model.compile(optimizer=optimizer, loss=lossmetric, loss_weights=[yweight, xyweight]) # Train the model history = model.fit({'main_in': Xmain_train, 'exo_in': Xexo_train}, {'main_out': ymain_train, 'combo_out': ymain_train}, epochs=num_epochs, batch_size=batch_size, verbose=verbose) minloss = 1 milossepoch = 0 for i, loss in enumerate(history.history['combo_out_loss']): if loss <= minloss: minloss = loss minlossepoch = i+1 print("Minimum Loss : {}, occured at Epoch - {}".format(minloss, minlossepoch)) # Save loss hostory lossdf = pd.DataFrame( data={ "loss": history.history['loss'], "main_out_loss": history.history['main_out_loss'], "combo_out_loss": history.history['combo_out_loss']}) lossdf.to_csv(loss_history_path, sep=',',index=False) print('Loss History saved') # Save the model model.save(os.path.join(model_path, 'rnn-combo-model.h5')) print('Training complete') except Exception as e: # Write out an error file. This will be returned as the failureReason in the # DescribeTrainingJob result. trc = traceback.format_exc() with open(os.path.join(output_path, 'failure'), 'w') as s: s.write('Exception during training: ' + str(e) + '\n' + trc) # Printing this causes the exception to be in the training job logs, as well. print('Exception during training: ' + str(e) + '\n' + trc, file=sys.stderr) # A non-zero exit code causes the training job to be marked as Failed. sys.exit(255) if __name__ == '__main__': train() # Zero exit code => Success sys.exit(0)