{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# TensorFlow Training and using checkpointing on SageMaker Managed Spot Training\n", "\n", "The example here is almost the same as [Train and Host a Keras Model with Pipe Mode and Horovod on Amazon SageMaker](https://github.com/aws/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/keras_script_mode_pipe_mode_horovod/tensorflow_keras_CIFAR10.ipynb).\n", "\n", "This notebook tackles the exact same problem with the same solution, but it has been modified to be able to run using SageMaker Managed Spot infrastructure. SageMaker Managed Spot uses [EC2 Spot Instances](https://aws.amazon.com/ec2/spot/) to run Training at a lower cost.\n", "\n", "Please read the original notebook and try it out to gain an understanding of the ML use-case and how it is being solved. We will not delve into that here in this notebook.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup\n", "\n", "First, we define a few variables that are be needed later in the example." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sagemaker\n", "from sagemaker import get_execution_role\n", "\n", "sagemaker_session = sagemaker.Session()\n", "\n", "role = get_execution_role()\n", "print('SageMaker version: ' + sagemaker.__version__)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## The CIFAR-10 dataset\n", "\n", "The [CIFAR-10 dataset](https://www.cs.toronto.edu/~kriz/cifar.html) is one of the most popular machine learning datasets. It consists of 60,000 32x32 images belonging to 10 different classes (6,000 images per class). Here are the classes in the dataset, as well as 10 random images from each:\n", "\n", "![cifar10](https://maet3608.github.io/nuts-ml/_images/cifar10.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare the dataset for training\n", "\n", "To use the CIFAR-10 dataset, we first download it and convert it to TFRecords. This step takes around 5 minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!python generate_cifar10_tfrecords.py --data-dir ./data" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we upload the data to Amazon S3:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.s3 import S3Uploader\n", "\n", "bucket = sagemaker_session.default_bucket()\n", "prefix = 'tf-cifar10-example'\n", "\n", "dataset_uri = S3Uploader.upload('data', 's3://{}/{}/data'.format(bucket,prefix))\n", "print('Training Dataset location: {}'.format(dataset_uri))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Train the model\n", "\n", "In this tutorial, we train a deep CNN to learn a classification task with the CIFAR-10 dataset. We compare three different training jobs: a baseline training job, training with Pipe Mode, and distributed training with Horovod.\n", "\n", "### Run a baseline training job on SageMaker\n", "\n", "The SageMaker Python SDK's `sagemaker.tensorflow.TensorFlow` estimator class makes it easy for us to interact with SageMaker. We create one for each of the different training jobs we run in this example. A couple parameters worth noting:\n", "\n", "* `entry_point`: our training script (adapted from [this Keras example](https://github.com/keras-team/keras/blob/master/examples/cifar10_cnn.py)).\n", "* `instance_count`: the number of training instances. Here, we set it to 1 for our baseline training job.\n", "\n", "As we run each of our training jobs, we change different parameters to configure our different training jobs.\n", "\n", "For more details about the TensorFlow estimator class, see the [API documentation](https://sagemaker.readthedocs.io/en/stable/sagemaker.tensorflow.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Verify the training code\n", "\n", "Before running the baseline training job, we first use [the SageMaker Python SDK's Local Mode feature](https://sagemaker.readthedocs.io/en/stable/overview.html#local-mode) to check that our code works with SageMaker's TensorFlow environment. Local Mode downloads the [prebuilt Docker image for TensorFlow](https://docs.aws.amazon.com/deep-learning-containers/latest/devguide/deep-learning-containers-images.html) and runs a Docker container locally for a training job. In other words, it simulates the SageMaker environment for a quicker development cycle, so we use it here just to test out our code.\n", "\n", "We create a TensorFlow estimator, and specify the `instance_type` to be `'local'` or `'local_gpu'`, depending on our local instance type. This tells the estimator to run our training job locally (as opposed to on SageMaker). We also have our training code run for only one epoch because our intent here is to verify the code, not train an accurate model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import subprocess\n", "\n", "from sagemaker.tensorflow import TensorFlow\n", "\n", "instance_type = 'local'\n", "\n", "if subprocess.call('nvidia-smi') == 0:\n", " # Set instance type to GPU if one is present\n", " instance_type = 'local_gpu'\n", " \n", "local_hyperparameters = {'epochs': 1, 'batch-size' : 64}\n", "\n", "estimator = TensorFlow(entry_point='cifar10_keras_main.py',\n", " source_dir='source_dir',\n", " role=role,\n", " framework_version='1.15.2',\n", " py_version='py3',\n", " hyperparameters=local_hyperparameters,\n", " instance_count=1,\n", " instance_type=instance_type)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once we have our estimator, we call `fit()` to start the training job and pass the inputs that we downloaded earlier. We pass the inputs as a dictionary to define different data channels for training." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "\n", "data_path = os.path.join(os.getcwd(), 'data')\n", "\n", "local_inputs = {\n", " 'train': 'file://{}/train'.format(data_path),\n", " 'validation': 'file://{}/validation'.format(data_path),\n", " 'eval': 'file://{}/eval'.format(data_path),\n", "}\n", "estimator.fit(local_inputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Run a baseline training job on SageMaker\n", "\n", "Now we run training jobs on SageMaker, starting with our baseline training job." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Configure metrics\n", "\n", "In addition to running the training job, Amazon SageMaker can retrieve training metrics directly from the logs and send them to CloudWatch metrics. Here, we define metrics we would like to observe:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "metric_definitions = [\n", " {'Name': 'train:loss', 'Regex': '.*loss: ([0-9\\\\.]+) - accuracy: [0-9\\\\.]+.*'},\n", " {'Name': 'train:accuracy', 'Regex': '.*loss: [0-9\\\\.]+ - accuracy: ([0-9\\\\.]+).*'},\n", " {'Name': 'validation:accuracy', 'Regex': '.*step - loss: [0-9\\\\.]+ - accuracy: [0-9\\\\.]+ - val_loss: [0-9\\\\.]+ - val_accuracy: ([0-9\\\\.]+).*'},\n", " {'Name': 'validation:loss', 'Regex': '.*step - loss: [0-9\\\\.]+ - accuracy: [0-9\\\\.]+ - val_loss: ([0-9\\\\.]+) - val_accuracy: [0-9\\\\.]+.*'},\n", " {'Name': 'sec/steps', 'Regex': '.* - \\d+s (\\d+)[mu]s/step - loss: [0-9\\\\.]+ - accuracy: [0-9\\\\.]+ - val_loss: [0-9\\\\.]+ - val_accuracy: [0-9\\\\.]+'}\n", "]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once again, we create a TensorFlow estimator, with a couple key modfications from last time:\n", "\n", "* `instance_type`: the instance type for training. We set this to `ml.p3.2xlarge` because we are training on SageMaker now. For a list of available instance types, see [the AWS documentation](https://aws.amazon.com/sagemaker/pricing/instance-types).\n", "* `metric_definitions`: the metrics (defined above) that we want sent to CloudWatch." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.tensorflow import TensorFlow\n", "\n", "hyperparameters = {'epochs': 10, 'batch-size': 256}\n", "\n", "estimator = TensorFlow(entry_point='cifar10_keras_main.py',\n", " source_dir='source_dir',\n", " metric_definitions=metric_definitions,\n", " hyperparameters=hyperparameters,\n", " role=role,\n", " framework_version='1.15.2',\n", " py_version='py3',\n", " instance_count=1,\n", " instance_type='ml.p3.2xlarge',\n", " base_job_name='cifar10-tf-on-demand')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Like before, we call `fit()` to start the SageMaker training job and pass the inputs in a dictionary to define different data channels for training. This time, we use the S3 URI from uploading our data." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "inputs = {\n", " 'train': '{}/train'.format(dataset_uri),\n", " 'validation': '{}/validation'.format(dataset_uri),\n", " 'eval': '{}/eval'.format(dataset_uri),\n", "}\n", "\n", "estimator.fit(inputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Managed Spot Training with a TensorFlow Estimator\n", "\n", "For Managed Spot Training using a TensorFlow Estimator we need to configure two things:\n", "1. Enable the `use_spot_instances` constructor arg - a simple self-explanatory boolean.\n", "2. Set the `max_wait` constructor arg - this is an int arg representing the amount of time you are willing to wait for Spot infrastructure to become available. Some instance types are harder to get at Spot prices and you may have to wait longer. You are not charged for time spent waiting for Spot infrastructure to become available, you're only charged for actual compute time spent once Spot instances have been successfully procured.\n", "\n", "Normally, a third requirement would also be necessary here - modifying your code to ensure a regular checkpointing cadence - however, TensorFlow Estimators already do this, so no changes are necessary here. Checkpointing is highly recommended for Manage Spot Training jobs due to the fact that Spot instances can be interrupted with short notice and using checkpoints to resume from the last interruption ensures you don't lose any progress made before the interruption.\n", "\n", "Feel free to toggle the `use_spot_instances` variable to see the effect of running the same job using regular (a.k.a. \"On Demand\") infrastructure.\n", "\n", "Note that `max_wait` can be set if and only if `use_spot_instances` is enabled and **must** be greater than or equal to `max_run`." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "use_spot_instances = True\n", "max_run=600\n", "max_wait = 1200" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Simulating Spot interruption after 5 epochs\n", "\n", "Our training job should run on 10 epochs.\n", "\n", "However, we will simulate a situation that after 5 epochs a spot interruption occurred.\n", "\n", "Note the `checkpoint_s3_uri` variable which stores the S3 URI in which to persist checkpoints that the algorithm persists (if any) during training." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import uuid\n", "\n", "checkpoint_suffix = str(uuid.uuid4())[:8]\n", "checkpoint_s3_uri = 's3://{}/checkpoint-{}'.format(bucket, checkpoint_suffix)\n", "\n", "print('Checkpointing location: {}'.format(checkpoint_s3_uri))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The goal is that the checkpointing data will be copied to S3, so when there is a spot capacity available again, the training job can resume from the 6th epoch." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hyperparameters = {'epochs': 5, 'batch-size': 256}\n", "\n", "spot_estimator = TensorFlow(entry_point='cifar10_keras_main.py',\n", " source_dir='source_dir',\n", " metric_definitions=metric_definitions,\n", " hyperparameters=hyperparameters,\n", " role=role,\n", " framework_version='1.15.2',\n", " py_version='py3',\n", " instance_count=1,\n", " instance_type='ml.p3.2xlarge',\n", " base_job_name='cifar10-tf-spot-1st-run',\n", " checkpoint_s3_uri=checkpoint_s3_uri,\n", " use_spot_instances=use_spot_instances,\n", " max_run=max_run,\n", " max_wait=max_wait)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spot_estimator.fit(inputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Savings\n", "Towards the end of the job you should see two lines of output printed:\n", "\n", "- `Training seconds: X` : This is the actual compute-time your training job spent\n", "- `Billable seconds: Y` : This is the time you will be billed for after Spot discounting is applied.\n", "\n", "If you enabled the `use_spot_instances` var then you should see a notable difference between `X` and `Y` signifying the cost savings you will get for having chosen Managed Spot Training. This should be reflected in an additional line:\n", "- `Managed Spot Training savings: (1-Y/X)*100 %`" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Analyze training job logs\n", "\n", "Analyzing the training job logs, we can see that the training job starts from the 1st epoch:\n", "\n", "```\n", "INFO:root:Starting training from epoch: 1\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View the job training Checkpoint configuration\n", "\n", "We can now view the Checkpoint configuration from the training job directly in the SageMaker console. \n", "\n", "Log into the [SageMaker console](https://console.aws.amazon.com/sagemaker/home), choose the latest training job, and scroll down to the Checkpoint configuration section. \n", "\n", "Choose the S3 output path link and you'll be directed to the S3 bucket were checkpointing data is saved.\n", "\n", "You can see there are 5 files there:\n", "\n", "```python\n", "checkpoint-1.h5\n", "checkpoint-2.h5\n", "checkpoint-3.h5\n", "checkpoint-4.h5\n", "checkpoint-5.h5\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Continue training after Spot capacity is resumed\n", "\n", "Now we simulate a situation where Spot capacity is resumed.\n", "\n", "We will start a training job again, this time with 10 epochs.\n", "\n", "What we expect is that the tarining job will start from the 6th epoch.\n", "\n", "This is done when training job starts. It checks the checkpoint s3 location for checkpoints data. If there are, they are copied to `/opt/ml/checkpoints` on the training conatiner.\n", "\n", "In the code you can see the `load_model_from_checkpoints` function to load the checkpoints data:\n", "\n", "```python\n", "def load_model_from_checkpoints(checkpoint_path):\n", " checkpoint_files = [file for file in os.listdir(checkpoint_path) if file.endswith('.' + 'h5')]\n", " logging.info('------------------------------------------------------')\n", " logging.info(\"Available checkpoint files: {}\".format(checkpoint_files))\n", " epoch_numbers = [re.search('(\\.*[0-9])(?=\\.)',file).group() for file in checkpoint_files]\n", " \n", " max_epoch_number = max(epoch_numbers)\n", " max_epoch_index = epoch_numbers.index(max_epoch_number)\n", " max_epoch_filename = checkpoint_files[max_epoch_index]\n", "\n", " logging.info('Latest epoch checkpoint file name: {}'.format(max_epoch_filename))\n", " logging.info('Resuming training from epoch: {}'.format(int(max_epoch_number)+1))\n", " logging.info('------------------------------------------------------')\n", " \n", " resumed_model_from_checkpoints = load_model(f'{checkpoint_path}/{max_epoch_filename}')\n", " return resumed_model_from_checkpoints, int(max_epoch_number)\n", "```\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "hyperparameters = {'epochs': 10, 'batch-size': 256}\n", "\n", "spot_estimator = TensorFlow(entry_point='cifar10_keras_main.py',\n", " source_dir='source_dir',\n", " metric_definitions=metric_definitions,\n", " hyperparameters=hyperparameters,\n", " role=role,\n", " framework_version='1.15.2',\n", " py_version='py3',\n", " instance_count=1,\n", " instance_type='ml.p3.2xlarge',\n", " base_job_name='cifar10-tf-spot-2nd-run',\n", " checkpoint_s3_uri=checkpoint_s3_uri,\n", " use_spot_instances=use_spot_instances,\n", " max_run=max_run,\n", " max_wait=max_wait)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "spot_estimator.fit(inputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Analyze training job logs\n", "\n", "Analyzing the training job logs, we can see that now, the training job starts from the 6th epoch.\n", "\n", "We can see the output of `load_model_from_checkpoints` function:\n", "\n", "```\n", "INFO:root:------------------------------------------------------\n", "INFO:root:Available checkpoint files: ['checkpoint-1.h5', 'checkpoint-4.h5', 'checkpoint-3.h5', 'checkpoint-2.h5', 'checkpoint-5.h5']\n", "INFO:root:Latest epoch checkpoint file name: checkpoint-5.h5\n", "INFO:root:Resuming training from epoch: 6\n", "INFO:root:------------------------------------------------------\n", "```\n", "\n", "Going further down in the logs, we can now see the following line indicating training job will start from the 6th epoch:\n", "```\n", "INFO:root:Starting training from epoch: 6\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### View the job training Checkpoint configuration after job completed 10 epochs\n", "\n", "We can now view the Checkpoint configuration from the training job directly in the SageMaker console. \n", "\n", "Log into the [SageMaker console](https://console.aws.amazon.com/sagemaker/home), choose the latest training job, and scroll down to the Checkpoint configuration section. \n", "\n", "Choose the S3 output path link and you'll be directed to the S3 bucket were checkpointing data is saved.\n", "\n", "You can see there are 10 files there:\n", "\n", "```python\n", "checkpoint-1.h5\n", "checkpoint-2.h5\n", "checkpoint-3.h5\n", "checkpoint-4.h5\n", "checkpoint-5.h5\n", "checkpoint-6.h5\n", "checkpoint-7.h5\n", "checkpoint-8.h5\n", "checkpoint-9.h5\n", "checkpoint-10.h5\n", "```\n", "\n", "You'll be able to see that the dates of the first five checkpoint files (1-5), and the second group (6-10) are grouped together, indicating the different time where the training job was run." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Deploy the trained model\n", "\n", "After we train our model, we can deploy it to a SageMaker Endpoint, which serves prediction requests in real-time. To do so, we simply call `deploy()` on our estimator, passing in the desired number of instances and instance type for the endpoint.\n", "\n", "Because we're using TensorFlow Serving for deployment, our training script saves the model in TensorFlow's SavedModel format. For more details, see [this blog post on deploying Keras and TF models in SageMaker](https://aws.amazon.com/blogs/machine-learning/deploy-trained-keras-or-tensorflow-models-using-amazon-sagemaker)." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "predictor = spot_estimator.deploy(initial_instance_count=1, instance_type='ml.m4.xlarge')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Invoke the endpoint\n", "\n", "To verify the that the endpoint is in service, we generate some random data in the correct shape and get a prediction." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "\n", "data = np.random.randn(1, 32, 32, 3)\n", "print('Predicted class: {}'.format(np.argmax(predictor.predict(data)['predictions'])))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's use the test dataset for predictions." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from keras.datasets import cifar10\n", "\n", "(x_train, y_train), (x_test, y_test) = cifar10.load_data()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With the data loaded, we can use it for predictions:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from keras.preprocessing.image import ImageDataGenerator\n", "\n", "def predict(data):\n", " predictions = predictor.predict(data)['predictions']\n", " return predictions\n", "\n", "\n", "predicted = []\n", "actual = []\n", "batches = 0\n", "batch_size = 128\n", "\n", "datagen = ImageDataGenerator()\n", "for data in datagen.flow(x_test, y_test, batch_size=batch_size):\n", " for i, prediction in enumerate(predict(data[0])):\n", " predicted.append(np.argmax(prediction))\n", " actual.append(data[1][i][0])\n", "\n", " batches += 1\n", " if batches >= len(x_test) / batch_size:\n", " break" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "With the predictions, we calculate our model accuracy and create a confusion matrix." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sklearn.metrics import accuracy_score\n", "\n", "accuracy = accuracy_score(y_pred=predicted, y_true=actual)\n", "display('Average accuracy: {}%'.format(round(accuracy * 100, 2)))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%matplotlib inline\n", "import matplotlib.pyplot as plt\n", "import pandas as pd\n", "import seaborn as sn\n", "from sklearn.metrics import confusion_matrix\n", "\n", "cm = confusion_matrix(y_pred=predicted, y_true=actual)\n", "cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]\n", "sn.set(rc={'figure.figsize': (11.7,8.27)})\n", "sn.set(font_scale=1.4) # for label size\n", "sn.heatmap(cm, annot=True, annot_kws={\"size\": 10}) # font size" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Aided by the colors of the heatmap, we can use this confusion matrix to understand how well the model performed for each label." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Cleanup\n", "\n", "To avoid incurring extra charges to your AWS account, let's delete the endpoint we created:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor.delete_endpoint()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow_p36", "language": "python", "name": "conda_tensorflow_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" }, "notice": "Copyright 2017-2020 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." }, "nbformat": 4, "nbformat_minor": 2 }