# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). You # may not use this file except in compliance with the License. A copy of # the License is located at # # http://aws.amazon.com/apache2.0/ # # or in the "license" file accompanying this file. This file is # distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF # ANY KIND, either express or implied. See the License for the specific # language governing permissions and limitations under the License. from __future__ import absolute_import, print_function import os import numpy as np import tensorflow as tf from tensorflow import keras from tensorflow.keras.layers import Activation, Conv2D, Dense, Dropout, Flatten, MaxPooling2D from tensorflow.keras.models import Sequential from tensorflow.keras.preprocessing.image import ImageDataGenerator from trainer.environment import create_trainer_environment NUM_CLASSES = 10 EPOCHS = 10 NUM_PREDICTIONS = 20 MODEL_NAME = "keras_cifar10_trained_model.h5" # the trainer environment contains useful information about env = create_trainer_environment() print("creating SageMaker trainer environment:\n%s" % str(env)) # getting the hyperparameters batch_size = env.hyperparameters.get("batch_size", object_type=int) data_augmentation = env.hyperparameters.get("data_augmentation", default=True, object_type=bool) learning_rate = env.hyperparameters.get("learning_rate", default=0.0001, object_type=float) width_shift_range = env.hyperparameters.get("width_shift_range", object_type=float) height_shift_range = env.hyperparameters.get("height_shift_range", object_type=float) EPOCHS = env.hyperparameters.get("epochs", default=10, object_type=int) # reading data from train and test channels train_data = np.load(os.path.join(env.channel_dirs["train"], "cifar-10-npz-compressed.npz")) (x_train, y_train) = train_data["x"], train_data["y"] test_data = np.load(os.path.join(env.channel_dirs["test"], "cifar-10-npz-compressed.npz")) (x_test, y_test) = test_data["x"], test_data["y"] model = Sequential() model.add(Conv2D(32, (3, 3), padding="same", input_shape=x_train.shape[1:])) model.add(Activation("relu")) model.add(Conv2D(32, (3, 3))) model.add(Activation("relu")) model.add(MaxPooling2D(pool_size=(2, 2))) model.add(Dropout(0.25)) model.add(Conv2D(64, (3, 3), padding="same")) model.add(Activation("relu")) model.add(Conv2D(64, (3, 3))) model.add(Activation("relu")) model.add(MaxPooling2D(pool_size=(2, 2))) model.add(Dropout(0.25)) model.add(Flatten()) model.add(Dense(512)) model.add(Activation("relu")) model.add(Dropout(0.5)) model.add(Dense(NUM_CLASSES)) model.add(Activation("softmax")) # initiate RMSprop optimizer opt = keras.optimizers.RMSprop(lr=learning_rate, decay=1e-6) # Let's train the model using RMSprop model.compile(loss="categorical_crossentropy", optimizer=opt, metrics=["accuracy"]) x_train = x_train.astype("float32") x_test = x_test.astype("float32") x_train /= 255 x_test /= 255 if not data_augmentation: print("Not using data augmentation.") model.fit( x_train, y_train, batch_size=batch_size, epochs=EPOCHS, validation_data=(x_test, y_test), shuffle=True, ) else: print("Using real-time data augmentation.") # This will do preprocessing and real time data augmentation: data_generator = ImageDataGenerator( featurewise_center=False, # set input mean to 0 over the dataset samplewise_center=False, # set each sample mean to 0 featurewise_std_normalization=False, # divide inputs by std of the dataset samplewise_std_normalization=False, # divide each input by its std zca_whitening=False, # apply ZCA whitening rotation_range=0, # randomly rotate images in the range (degrees, 0 to 180) width_shift_range=width_shift_range, # randomly shift images horizontally (fraction of total width) height_shift_range=height_shift_range, # randomly shift images vertically (fraction of total height) horizontal_flip=True, # randomly flip images vertical_flip=False, ) # randomly flip images # Compute quantities required for feature-wise normalization # (std, mean, and principal components if ZCA whitening is applied). data_generator.fit(x_train) # Fit the model on the batches generated by data_generator.flow(). data_generator_flow = data_generator.flow(x_train, y_train, batch_size=batch_size) model.fit_generator( data_generator_flow, epochs=EPOCHS, validation_data=(x_test, y_test), workers=4 ) # Save model and weights model_path = os.path.join(env.model_dir, MODEL_NAME) model.save(model_path) print("Saved trained model at %s " % model_path) # Score trained model. scores = model.evaluate(x_test, y_test, verbose=1) print("Test loss:", scores[0]) print("Test accuracy:", scores[1])