# Copyright 2019 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. # A copy of the License is located at # # https://aws.amazon.com/apache-2-0/ # # or in the "license" file accompanying this file. This file is distributed # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either # express or implied. See the License for the specific language governing # permissions and limitations under the License. from __future__ import print_function import argparse import math import os import horovod.tensorflow.keras as hvd import numpy as np import tensorflow as tf from tensorflow.keras import backend as K # import tensorflow.keras as keras from tensorflow.keras.datasets import mnist from tensorflow.keras.layers import Conv2D, Dense, Dropout, Flatten, MaxPooling2D from tensorflow.keras.models import Sequential if __name__ == "__main__": num_gpus = int(os.environ["SM_NUM_GPUS"]) parser = argparse.ArgumentParser() # Data, model, and output directories. These are required. parser.add_argument("--output-dir", type=str, default=os.environ["SM_OUTPUT_DIR"]) parser.add_argument("--model_dir", type=str) parser.add_argument("--train", type=str, default=os.environ["SM_CHANNEL_TRAIN"]) parser.add_argument("--test", type=str, default=os.environ["SM_CHANNEL_TEST"]) args, _ = parser.parse_known_args() # Horovod: initialize Horovod. hvd.init() # Horovod: pin GPU to be used to process local rank (one GPU per process) config = tf.ConfigProto() config.gpu_options.allow_growth = True config.gpu_options.visible_device_list = str(hvd.local_rank()) K.set_session(tf.Session(config=config)) batch_size = 128 num_classes = 10 # Horovod: adjust number of epochs based on number of GPUs. epochs = int(math.ceil(12.0 / hvd.size())) # Input image dimensions img_rows, img_cols = 28, 28 # The data, shuffled and split between train and test sets x_train = np.load(os.path.join(args.train, "train.npz"))["data"] y_train = np.load(os.path.join(args.train, "train.npz"))["labels"] print("Train dataset loaded from: {}".format(os.path.join(args.train, "train.npz"))) x_test = np.load(os.path.join(args.test, "test.npz"))["data"] y_test = np.load(os.path.join(args.test, "test.npz"))["labels"] print("Test dataset loaded from: {}".format(os.path.join(args.test, "test.npz"))) if K.image_data_format() == "channels_first": x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols) x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols) input_shape = (1, img_rows, img_cols) else: x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1) x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1) input_shape = (img_rows, img_cols, 1) x_train = x_train.astype("float32") x_test = x_test.astype("float32") x_train /= 255 x_test /= 255 print("x_train shape:", x_train.shape) print(x_train.shape[0], "train samples") print(x_test.shape[0], "test samples") # Convert class vectors to binary class matrices y_train = tf.keras.utils.to_categorical(y_train, num_classes) y_test = tf.keras.utils.to_categorical(y_test, num_classes) model = Sequential() model.add(Conv2D(32, kernel_size=(3, 3), activation="relu", input_shape=input_shape)) model.add(Conv2D(64, (3, 3), activation="relu")) model.add(MaxPooling2D(pool_size=(2, 2))) model.add(Dropout(0.25)) model.add(Flatten()) model.add(Dense(128, activation="relu")) model.add(Dropout(0.5)) model.add(Dense(num_classes, activation="softmax")) # Horovod: adjust learning rate based on number of GPUs. opt = tf.keras.optimizers.Adadelta(1.0 * hvd.size()) # Horovod: add Horovod Distributed Optimizer. opt = hvd.DistributedOptimizer(opt) model.compile( loss=tf.keras.losses.categorical_crossentropy, optimizer=opt, metrics=["accuracy"] ) callbacks = [ # Horovod: broadcast initial variable states from rank 0 to all other processes. # This is necessary to ensure consistent initialization of all workers when # training is started with random weights or restored from a checkpoint. hvd.callbacks.BroadcastGlobalVariablesCallback(0), ] # Horovod: save checkpoints only on worker 0 to prevent other workers from corrupting them. if hvd.rank() == 0: callbacks.append(tf.keras.callbacks.ModelCheckpoint("./checkpoint-{epoch}.h5")) model.fit( x_train, y_train, batch_size=batch_size, callbacks=callbacks, epochs=epochs, verbose=1, validation_data=(x_test, y_test), ) score = model.evaluate(x_test, y_test, verbose=0) print("Test loss:", score[0]) print("Test accuracy:", score[1]) # Horovod: Save model only on worker 0 (i.e. master) if hvd.rank() == 0: model.save(os.path.join(args.model_dir, "model.h5"))