#!/usr/bin/env python # Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. # A copy of the License is located at # # http://www.apache.org/licenses/LICENSE-2.0 # # or in the "license" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either # express or implied. See the License for the specific language governing # permissions and limitations under the License. # # A sample training component that trains a keras text classification # model. from __future__ import print_function import json import os import pickle import re import sys import traceback import boto3 import numpy as np import pandas as pd import tensorflow as tf from sklearn.feature_extraction import _stop_words from tensorflow.keras.losses import CategoricalCrossentropy from tensorflow.keras.preprocessing.sequence import pad_sequences from tensorflow.keras.preprocessing.text import Tokenizer # These are the paths to where SageMaker mounts interesting things in your container. max_features=5000 #we set maximum number of words to 5000 maxlen=100 #and maximum sequence length to 100 embedding_dim = 50 stop_words=_stop_words.ENGLISH_STOP_WORDS model_path = os.environ.get('SM_MODEL_DIR', '/opt/ml/model') input_path = os.environ.get('SM_CHANNEL_TRAIN', '/opt/ml/input/data') output_path = os.environ.get('SM_OUTPUT_DIR', '/opt/ml/output') # This bucket should be updated based on the value in Part 2: Bring Your Own Model to an Active Learning Workflow # notebook after the preprocessing is done. tokenizer_bucket = '' tokenizer_key = 'sagemaker-byoal/tokenizer.pickle' # There is a minor path difference between the location of the input from notebook compared to the step function. This function looks for a file in both the paths. # Note - the hyperparameters and the validation file are ignored to keep this example simple. def get_training_file(): train_file_paths = ['/opt/ml/input/data/train-manifest', '/opt/ml/input/data/training/train-manifest'] for file in train_file_paths: if os.path.isfile(file): return file raise Exception("train-manifest not found in expected locations {}".format(",".join(train_file_paths))) def get_keras_input(inp_file): tf_train=pd.DataFrame(columns=['TITLE','CATEGORY']) tmp = [] for line in inp_file: train_data=json.loads(line) single_train_input = {'CATEGORY': train_data['category'], 'TITLE':train_data['source']} #tf_train=tf_train.append(single_train_input, ignore_index=True) #tf_train=tf_train.concat(single_train_input, ignore_index=True ) tmp.append(single_train_input) df1=pd.DataFrame.from_dict(tmp) tf_train = pd.concat([tf_train, df1] , ignore_index=True) tf_train["TITLE"]=tf_train["TITLE"].str.lower().replace('[^\w\s]','') tf_train["TITLE"]= tf_train["TITLE"].apply(lambda x: ' '.join([word for word in x.split() if word not in (stop_words)])) tf_train.dropna(inplace=True) cat=tf_train['CATEGORY'].astype("category").cat.categories tf_train['CATEGORY']=tf_train['CATEGORY'].astype("category").cat.codes y_train_int=tf_train['CATEGORY'].values # Convert labels to categorical one-hot encoding y_train = tf.keras.utils.to_categorical(y_train_int, num_classes=4) pickle_file_name = tokenizer_key.split('/')[-1] boto3.resource('s3').Bucket(tokenizer_bucket).download_file(tokenizer_key, pickle_file_name) with open(pickle_file_name, 'rb') as handle: tok= pickle.load(handle) tf_train=tok.texts_to_sequences(list(tf_train['TITLE'])) #this is how we create sequences X_train=tf.keras.preprocessing.sequence.pad_sequences(tf_train, maxlen=maxlen) #let's execute pad step vocab_size = len(tok.word_index) + 1 return X_train, y_train, vocab_size def get_validation_data(): validation_files = [ '/opt/ml/input/data/validation/validation-manifest','/opt/ml/input/data/validation-manifest'] for val_file in validation_files: if os.path.isfile(val_file): return get_keras_input(open(val_file, 'r'))[:2] raise Exception("validation-manifest not found in expected locations {}".format(",".join(validation_files))) # The function to execute the training. def train(): print('Starting the training with input_path {}'.format(input_path)) try: train_file=open(get_training_file(), 'r') X_train, y_train, vocab_size = get_keras_input(train_file) model = tf.keras.models.Sequential([ tf.keras.layers.Embedding(input_dim=vocab_size, #embedding input output_dim=embedding_dim,#embedding output input_length=maxlen), #maximum length of an input sequence tf.keras.layers.GlobalMaxPool1D(), #Max pooling operation for temporal data tf.keras.layers.Dropout(0.3), # Drop out to avoid overfitting tf.keras.layers.Dense(4, activation=tf.nn.softmax) #ouput layer a Dense layer with 4 probabilities #we also define our final activation function which is the softmax function typical for multiclass #classification problems ]) model.compile(optimizer=tf.keras.optimizers.Nadam(learning_rate=1e-3), \ loss=CategoricalCrossentropy(label_smoothing=0.1), \ metrics=['accuracy']) # training loss is used instead of validation loss for stopping condition to increase # confidence in the predicted labels. early_stopping_cb = tf.keras.callbacks.EarlyStopping(monitor='loss',patience=2,restore_best_weights=True) checkpoint_cb = tf.keras.callbacks.ModelCheckpoint("keras_model.h5",save_best_only=True) history = model.fit(X_train, y_train, epochs=100, validation_data=get_validation_data(), callbacks=[checkpoint_cb, early_stopping_cb]) model_file_name = os.path.join(model_path,"keras_news_classifier_model.h5") model.save(model_file_name) except Exception as e: # Write out an error file. This will be returned as the failureReason in the # DescribeTrainingJob result. trc = traceback.format_exc() with open(os.path.join(output_path, 'failure'), 'w') as s: s.write('Exception during training: ' + str(e) + '\n' + trc) # Printing this causes the exception to be in the training job logs, as well. print('Exception during training: ' + str(e) + '\n' + trc, file=sys.stderr) # A non-zero exit code causes the training job to be marked as Failed. sys.exit(255) if __name__ == '__main__': print(pd.__version__) train() # A zero exit code causes the job to be marked a Succeeded. sys.exit(0)