{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## TensorFlow 2 Complete Project Workflow in Amazon SageMaker\n", "### Data Preprocessing -> Code Prototyping -> Automatic Model Tuning -> Deployment\n", " \n", "1. [Introduction](#Introduction)\n", "2. [SageMaker Processing for dataset transformation](#SageMakerProcessing)\n", "3. [Local Mode training](#LocalModeTraining)\n", "4. [Local Mode endpoint](#LocalModeEndpoint)\n", "5. [SageMaker hosted training](#SageMakerHostedTraining)\n", "6. [Automatic Model Tuning](#AutomaticModelTuning)\n", "7. [SageMaker hosted endpoint](#SageMakerHostedEndpoint)\n", "8. [Workflow Automation with the Step Functions Data Science SDK](#WorkflowAutomation)\n", " 1. [Add an IAM policy to your SageMaker role](#IAMPolicy)\n", " 2. [Create an execution role for Step Functions](#CreateExecutionRole)\n", " 3. [Set up a TrainingPipeline](#TrainingPipeline)\n", " 4. [Visualizing the workflow](#VisualizingWorkflow)\n", " 5. [Creating and executing the pipeline](#CreatingExecutingPipeline)\n", " 6. [Cleanup](#Cleanup)\n", "9. [Extensions](#Extensions)\n", "\n", "\n", "### ***Prerequisite: To run the Local Mode sections of this example, use a SageMaker Notebook Instance; otherwise skip those sections (for example if you're using SageMaker Studio instead).***\n", "\n", " \n", "## Introduction \n", "\n", "If you are using TensorFlow 2, you can use the Amazon SageMaker prebuilt TensorFlow 2 container with training scripts similar to those you would use outside SageMaker. This feature is named Script Mode. Using Script Mode and other SageMaker features, you can build a complete workflow for a TensorFlow 2 project. This notebook presents such a workflow, including all key steps such as preprocessing data with SageMaker Processing, code prototyping with SageMaker Local Mode training and inference, and production-ready model training and deployment with SageMaker hosted training and inference. Automatic Model Tuning in SageMaker is used to tune the model's hyperparameters. Additionally, the [AWS Step Functions Data Science SDK](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/readmelink.html) is used to automate the main training and deployment steps for use in a production workflow outside notebooks. \n", "\n", "To enable you to run this notebook within a reasonable time (typically less than an hour), this notebook's use case is a straightforward regression task: predicting house prices based on the well-known Boston Housing dataset. This public dataset contains 13 features regarding housing stock of towns in the Boston area. Features include average number of rooms, accessibility to radial highways, adjacency to the Charles River, etc. \n", "\n", "To begin, we'll import some necessary packages and set up directories for local training and test data. We'll also set up a SageMaker Session to perform various operations, and specify an Amazon S3 bucket to hold input data and output. The default bucket used here is created by SageMaker if it doesn't already exist, and named in accordance with the AWS account ID and AWS Region. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import sagemaker\n", "import tensorflow as tf\n", "\n", "sess = sagemaker.Session()\n", "bucket = sess.default_bucket() \n", "\n", "data_dir = os.path.join(os.getcwd(), 'data')\n", "os.makedirs(data_dir, exist_ok=True)\n", "\n", "train_dir = os.path.join(os.getcwd(), 'data/train')\n", "os.makedirs(train_dir, exist_ok=True)\n", "\n", "test_dir = os.path.join(os.getcwd(), 'data/test')\n", "os.makedirs(test_dir, exist_ok=True)\n", "\n", "raw_dir = os.path.join(os.getcwd(), 'data/raw')\n", "os.makedirs(raw_dir, exist_ok=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# SageMaker Processing for dataset transformation \n", "\n", "Next, we'll import the dataset and transform it with SageMaker Processing, which can be used to process terabytes of data in a SageMaker-managed cluster separate from the instance running your notebook server. In a typical SageMaker workflow, notebooks are only used for prototyping and can be run on relatively inexpensive and less powerful instances, while processing, training and model hosting tasks are run on separate, more powerful SageMaker-managed instances. SageMaker Processing includes off-the-shelf support for Scikit-learn, as well as a Bring Your Own Container option, so it can be used with many different data transformation technologies and tasks. \n", "\n", "First we'll load the Boston Housing dataset, save the raw feature data and upload it to Amazon S3 for transformation by SageMaker Processing. We'll also save the labels for training and testing." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "from tensorflow.python.keras.datasets import boston_housing\n", "from sklearn.preprocessing import StandardScaler\n", "\n", "(x_train, y_train), (x_test, y_test) = boston_housing.load_data()\n", "\n", "np.save(os.path.join(raw_dir, 'x_train.npy'), x_train)\n", "np.save(os.path.join(raw_dir, 'x_test.npy'), x_test)\n", "np.save(os.path.join(train_dir, 'y_train.npy'), y_train)\n", "np.save(os.path.join(test_dir, 'y_test.npy'), y_test)\n", "s3_prefix = 'tf-2-workflow'\n", "rawdata_s3_prefix = '{}/data/raw'.format(s3_prefix)\n", "raw_s3 = sess.upload_data(path='./data/raw/', key_prefix=rawdata_s3_prefix)\n", "print(raw_s3)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To use SageMaker Processing, simply supply a Python data preprocessing script as shown below. For this example, we're using a SageMaker prebuilt Scikit-learn container, which includes many common functions for processing data. There are few limitations on what kinds of code and operations you can run, and only a minimal contract: input and output data must be placed in specified directories. If this is done, SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile preprocessing.py\n", "\n", "import glob\n", "import numpy as np\n", "import os\n", "from sklearn.preprocessing import StandardScaler\n", "\n", "if __name__=='__main__':\n", " \n", " input_files = glob.glob('{}/*.npy'.format('/opt/ml/processing/input'))\n", " print('\\nINPUT FILE LIST: \\n{}\\n'.format(input_files))\n", " scaler = StandardScaler()\n", " for file in input_files:\n", " if 'x_' in file:\n", " X = file\n", " elif 'y_' in file:\n", " y = file\n", " raw_x = np.load(X)\n", " raw_y = np.load(y)\n", " transformed = scaler.fit_transform(raw_x, raw_y)\n", " for file in input_files:\n", " raw = np.load(file)\n", " transformed = scaler.fit_transform(raw)\n", " if 'train' in file:\n", " output_path = os.path.join('/opt/ml/processing/train', 'x_train.npy')\n", " np.save(output_path, transformed)\n", " print('SAVED TRANSFORMED TRAINING DATA FILE\\n')\n", " else:\n", " output_path = os.path.join('/opt/ml/processing/test', 'x_test.npy')\n", " np.save(output_path, transformed)\n", " print('SAVED TRANSFORMED TEST DATA FILE\\n')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before starting the SageMaker Processing job, we instantiate a `SKLearnProcessor` object. This object allows you to specify the instance type to use in the job, as well as how many instances. Although the Boston Housing dataset is quite small, we'll use two instances to showcase how easy it is to spin up a cluster for SageMaker Processing. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker import get_execution_role\n", "from sagemaker.sklearn.processing import SKLearnProcessor\n", "\n", "sklearn_processor = SKLearnProcessor(framework_version='0.20.0',\n", " role=get_execution_role(),\n", " instance_type='ml.m5.xlarge',\n", " instance_count=2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We're now ready to run the Processing job. To enable distributing the data files equally among the instances, we specify the `ShardedByS3Key` distribution type in the `ProcessingInput` object. This ensures that if we have `n` instances, each instance will receive `1/n` files from the specified S3 bucket. It may take around 3 minutes for the following code cell to run, mainly to set up the cluster. At the end of the job, the cluster automatically will be torn down by SageMaker. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.processing import ProcessingInput, ProcessingOutput\n", "from time import gmtime, strftime \n", "\n", "processing_job_name = \"tf-2-workflow-{}\".format(strftime(\"%d-%H-%M-%S\", gmtime()))\n", "output_destination = 's3://{}/{}/data'.format(bucket, s3_prefix)\n", "\n", "sklearn_processor.run(code='preprocessing.py',\n", " job_name=processing_job_name,\n", " inputs=[ProcessingInput(\n", " source=raw_s3,\n", " destination='/opt/ml/processing/input',\n", " s3_data_distribution_type='ShardedByS3Key')],\n", " outputs=[ProcessingOutput(output_name='train',\n", " destination='{}/train'.format(output_destination),\n", " source='/opt/ml/processing/train'),\n", " ProcessingOutput(output_name='test',\n", " destination='{}/test'.format(output_destination),\n", " source='/opt/ml/processing/test')])\n", "\n", "preprocessing_job_description = sklearn_processor.jobs[-1].describe()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "In the log output of the SageMaker Processing job above, you should be able to see logs in two different colors for the two different instances, and that each instance received different files. Without the `ShardedByS3Key` distribution type, each instance would have received a copy of **all** files. By spreading the data equally among `n` instances, you should receive a speedup by approximately a factor of `n` for most stateless data transformations. After saving the job results locally, we'll move on to prototyping training and inference code with Local Mode." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_in_s3 = '{}/train/x_train.npy'.format(output_destination)\n", "test_in_s3 = '{}/test/x_test.npy'.format(output_destination)\n", "!aws s3 cp {train_in_s3} ./data/train/x_train.npy\n", "!aws s3 cp {test_in_s3} ./data/test/x_test.npy" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Local Mode training \n", "\n", "Local Mode in Amazon SageMaker is a convenient way to make sure your code is working locally as expected before moving on to full scale, hosted training in a separate, more powerful SageMaker-managed cluster. To train in Local Mode, it is necessary to have docker-compose or nvidia-docker-compose (for GPU instances) installed. Running the following commands will install docker-compose or nvidia-docker-compose, and configure the notebook environment for you." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!wget -q https://raw.githubusercontent.com/aws-samples/amazon-sagemaker-script-mode/master/local_mode_setup.sh\n", "!wget -q https://raw.githubusercontent.com/aws-samples/amazon-sagemaker-script-mode/master/daemon.json \n", "!/bin/bash ./local_mode_setup.sh" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we'll set up a TensorFlow Estimator for Local Mode training. Key parameters for the Estimator include:\n", "\n", "- `train_instance_type`: the kind of hardware on which training will run. In the case of Local Mode, we simply set this parameter to `local` to invoke Local Mode training on the CPU, or to `local_gpu` if the instance has a GPU. \n", "- `git_config`: to make sure training scripts are source controlled for coordinated, shared use by a team, the Estimator can pull in the code from a Git repository rather than local directories. \n", "- Other parameters of note: the algorithm’s hyperparameters, which are passed in as a dictionary, and a Boolean parameter indicating that we are using Script Mode. \n", "\n", "Recall that we are using Local Mode here mainly to make sure our code is working. Accordingly, instead of performing a full cycle of training with many epochs (passes over the full dataset), we'll train only for a small number of epochs just to confirm the code is working properly and avoid wasting full-scale training time unnecessarily." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.tensorflow import TensorFlow\n", "\n", "git_config = {'repo': 'https://github.com/aws-samples/amazon-sagemaker-script-mode', \n", " 'branch': 'master'}\n", "\n", "model_dir = '/opt/ml/model'\n", "train_instance_type = 'local'\n", "hyperparameters = {'epochs': 5, 'batch_size': 128, 'learning_rate': 0.01}\n", "local_estimator = TensorFlow(git_config=git_config,\n", " source_dir='tf-2-workflow/train_model',\n", " entry_point='train.py',\n", " model_dir=model_dir,\n", " instance_type=train_instance_type,\n", " instance_count=1,\n", " hyperparameters=hyperparameters,\n", " role=sagemaker.get_execution_role(),\n", " base_job_name='tf-2-workflow',\n", " framework_version='2.2',\n", " py_version='py37',\n", " script_mode=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The `fit` method call below starts the Local Mode training job. Metrics for training will be logged below the code, inside the notebook cell. You should observe the validation loss decrease substantially over the five epochs, with no training errors, which is a good indication that our training code is working as expected." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "inputs = {'train': f'file://{train_dir}',\n", " 'test': f'file://{test_dir}'}\n", "\n", "local_estimator.fit(inputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Local Mode endpoint \n", "\n", "While Amazon SageMaker’s Local Mode training is very useful to make sure your training code is working before moving on to full scale training, it also would be useful to have a convenient way to test your model locally before incurring the time and expense of deploying it to production. One possibility is to fetch the TensorFlow SavedModel artifact or a model checkpoint saved in Amazon S3, and load it in your notebook for testing. However, an even easier way to do this is to use the SageMaker Python SDK to do this work for you by setting up a Local Mode endpoint.\n", "\n", "More specifically, the Estimator object from the Local Mode training job can be used to deploy a model locally. With one exception, this code is the same as the code you would use to deploy to production. In particular, all you need to do is invoke the local Estimator's deploy method, and similarly to Local Mode training, specify the instance type as either `local_gpu` or `local` depending on whether your notebook is on a GPU instance or CPU instance. \n", "\n", "Just in case there are other inference containers running in Local Mode, we'll stop them to avoid conflict before deploying our new model locally." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!docker container stop $(docker container ls -aq) >/dev/null" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The following single line of code deploys the model locally in the SageMaker TensorFlow Serving container: " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_predictor = local_estimator.deploy(initial_instance_count=1, instance_type='local')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To get predictions from the Local Mode endpoint, simply invoke the Predictor's predict method." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_results = local_predictor.predict(x_test[:10])['predictions']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As a sanity check, the predictions can be compared against the actual target values." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_preds_flat_list = [float('%.1f'%(item)) for sublist in local_results for item in sublist]\n", "print('predictions: \\t{}'.format(np.array(local_preds_flat_list)))\n", "print('target values: \\t{}'.format(y_test[:10].round(decimals=1)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We only trained the model for a few epochs and there is much room for improvement, but the predictions so far should at least appear reasonably within the ballpark. \n", "\n", "To avoid having the SageMaker TensorFlow Serving container indefinitely running locally, simply gracefully shut it down by calling the `delete_endpoint` method of the Predictor object." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_predictor.delete_endpoint()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SageMaker hosted training \n", "\n", "Now that we've confirmed our code is working locally, we can move on to use SageMaker's hosted training functionality. Hosted training is preferred for doing actual training, especially large-scale, distributed training. Unlike Local Mode training, for hosted training the actual training itself occurs not on the notebook instance, but on a separate cluster of machines managed by SageMaker. Before starting hosted training, the data must be in S3, or an EFS or FSx for Lustre file system. We'll upload to S3 now, and confirm the upload was successful." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_prefix = 'tf-2-workflow'\n", "\n", "traindata_s3_prefix = '{}/data/train'.format(s3_prefix)\n", "testdata_s3_prefix = '{}/data/test'.format(s3_prefix)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_s3 = sess.upload_data(path='./data/train/', key_prefix=traindata_s3_prefix)\n", "test_s3 = sess.upload_data(path='./data/test/', key_prefix=testdata_s3_prefix)\n", "\n", "inputs = {'train':train_s3, 'test': test_s3}\n", "\n", "print(inputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We're now ready to set up an Estimator object for hosted training. It is similar to the Local Mode Estimator, except the `train_instance_type` has been set to a SageMaker ML instance type instead of `local` for Local Mode. Also, since we know our code is working now, we'll train for a larger number of epochs with the expectation that model training will converge to an improved, lower validation loss.\n", "\n", "With these two changes, we simply call `fit` to start the actual hosted training." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_instance_type = 'ml.c5.xlarge'\n", "hyperparameters = {'epochs': 30, 'batch_size': 128, 'learning_rate': 0.01}\n", "\n", "git_config = {'repo': 'https://github.com/aws-samples/amazon-sagemaker-script-mode', \n", " 'branch': 'master'}\n", "\n", "estimator = TensorFlow(git_config=git_config,\n", " source_dir='tf-2-workflow/train_model',\n", " entry_point='train.py',\n", " model_dir=model_dir,\n", " instance_type=train_instance_type,\n", " instance_count=1,\n", " hyperparameters=hyperparameters,\n", " role=sagemaker.get_execution_role(),\n", " base_job_name='tf-2-workflow',\n", " framework_version='2.2',\n", " py_version='py37',\n", " script_mode=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After starting the hosted training job with the `fit` method call below, you should observe the training converge over the longer number of epochs to a validation loss that is considerably lower than that which was achieved in the shorter Local Mode training job. Can we do better? We'll look into a way to do so in the **Automatic Model Tuning** section below. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator.fit(inputs)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As with the Local Mode training, hosted training produces a model saved in S3 that we can retrieve. This is an example of the modularity of SageMaker: having trained the model in SageMaker, you can now take the model out of SageMaker and run it anywhere else. Alternatively, you can deploy the model into a production-ready environment using SageMaker's hosted endpoints functionality, as shown in the **SageMaker hosted endpoint** section below.\n", "\n", "Retrieving the model from S3 is very easy: the hosted training estimator you created above stores a reference to the model's location in S3. You simply copy the model from S3 using the estimator's `model_data` property and unzip it to inspect the contents." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!aws s3 cp {estimator.model_data} ./model/model.tar.gz" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The unzipped archive should include the assets required by TensorFlow Serving to load the model and serve it, including a .pb file: " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!tar -xvzf ./model/model.tar.gz -C ./model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Automatic Model Tuning \n", "\n", "So far we have simply run one Local Mode training job and one Hosted Training job without any real attempt to tune hyperparameters to produce a better model, other than increasing the number of epochs. Selecting the right hyperparameter values to train your model can be difficult, and typically is very time consuming if done manually. The right combination of hyperparameters is dependent on your data and algorithm; some algorithms have many different hyperparameters that can be tweaked; some are very sensitive to the hyperparameter values selected; and most have a non-linear relationship between model fit and hyperparameter values. SageMaker Automatic Model Tuning helps automate the hyperparameter tuning process: it runs multiple training jobs with different hyperparameter combinations to find the set with the best model performance.\n", "\n", "We begin by specifying the hyperparameters we wish to tune, and the range of values over which to tune each one. We also must specify an objective metric to be optimized: in this use case, we'd like to minimize the validation loss." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.tuner import IntegerParameter, CategoricalParameter, ContinuousParameter, HyperparameterTuner\n", "\n", "hyperparameter_ranges = {\n", " 'learning_rate': ContinuousParameter(0.001, 0.2, scaling_type=\"Logarithmic\"),\n", " 'epochs': IntegerParameter(10, 50),\n", " 'batch_size': IntegerParameter(64, 256),\n", "}\n", "\n", "metric_definitions = [{'Name': 'loss',\n", " 'Regex': ' loss: ([0-9\\\\.]+)'},\n", " {'Name': 'val_loss',\n", " 'Regex': ' val_loss: ([0-9\\\\.]+)'}]\n", "\n", "objective_metric_name = 'val_loss'\n", "objective_type = 'Minimize'" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next we specify a HyperparameterTuner object that takes the above definitions as parameters. Each tuning job must be given a budget: a maximum number of training jobs. A tuning job will complete after that many training jobs have been executed. \n", "\n", "We also can specify how much parallelism to employ, in this case five jobs, meaning that the tuning job will complete after three series of five jobs in parallel have completed. For the default Bayesian Optimization tuning strategy used here, the tuning search is informed by the results of previous groups of training jobs, so we don't run all of the jobs in parallel, but rather divide the jobs into groups of parallel jobs. There is a trade-off: using more parallel jobs will finish tuning sooner, but likely will sacrifice tuning search accuracy. \n", "\n", "Now we can launch a hyperparameter tuning job by calling the `fit` method of the HyperparameterTuner object. The tuning job may take around 10 minutes to finish. While you're waiting, the status of the tuning job, including metadata and results for invidual training jobs within the tuning job, can be checked in the SageMaker console in the **Hyperparameter tuning jobs** panel. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tuner = HyperparameterTuner(estimator,\n", " objective_metric_name,\n", " hyperparameter_ranges,\n", " metric_definitions,\n", " max_jobs=15,\n", " max_parallel_jobs=5,\n", " objective_type=objective_type)\n", "\n", "tuning_job_name = \"tf-2-workflow-{}\".format(strftime(\"%d-%H-%M-%S\", gmtime()))\n", "tuner.fit(inputs, job_name=tuning_job_name)\n", "tuner.wait()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "After the tuning job is finished, we can use the `HyperparameterTuningJobAnalytics` object from the SageMaker Python SDK to list the top 5 tuning jobs with the best performance. Although the results vary from tuning job to tuning job, the best validation loss from the tuning job (under the FinalObjectiveValue column) likely will be substantially lower than the validation loss from the hosted training job above, where we did not perform any tuning other than manually increasing the number of epochs once. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tuner_metrics = sagemaker.HyperparameterTuningJobAnalytics(tuning_job_name)\n", "tuner_metrics.dataframe().sort_values(['FinalObjectiveValue'], ascending=True).head(5)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The total training time and training jobs status can be checked with the following lines of code. Because automatic early stopping is by default off, all the training jobs should be completed normally. For an example of a more in-depth analysis of a tuning job, see the SageMaker official sample [HPO_Analyze_TuningJob_Results.ipynb](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/hyperparameter_tuning/analyze_results/HPO_Analyze_TuningJob_Results.ipynb) notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "total_time = tuner_metrics.dataframe()['TrainingElapsedTimeSeconds'].sum() / 3600\n", "print(\"The total training time is {:.2f} hours\".format(total_time))\n", "tuner_metrics.dataframe()['TrainingJobStatus'].value_counts()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## SageMaker hosted endpoint \n", "\n", "Assuming the best model from the tuning job is better than the model produced by the individual Hosted Training job above, we could now easily deploy that model to production. A convenient option is to use a SageMaker hosted endpoint, which serves real time predictions from the trained model (Batch Transform jobs also are available for asynchronous, offline predictions on large datasets). The endpoint will retrieve the TensorFlow SavedModel created during training and deploy it within a SageMaker TensorFlow Serving container. This all can be accomplished with one line of code. \n", "\n", "More specifically, by calling the `deploy` method of the HyperparameterTuner object we instantiated above, we can directly deploy the best model from the tuning job to a SageMaker hosted endpoint. It will take several minutes longer to deploy the model to the hosted endpoint compared to the Local Mode endpoint, which is more useful for fast prototyping of inference code. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "tuning_predictor = tuner.deploy(initial_instance_count=1, instance_type='ml.m5.xlarge')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can compare the predictions generated by this endpoint with those generated locally by the Local Mode endpoint: " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "results = tuning_predictor.predict(x_test[:10])['predictions'] \n", "flat_list = [float('%.1f'%(item)) for sublist in results for item in sublist]\n", "print('predictions: \\t{}'.format(np.array(flat_list)))\n", "print('target values: \\t{}'.format(y_test[:10].round(decimals=1)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "To avoid billing charges from stray resources, you can delete the prediction endpoint to release its associated instance(s)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sess.delete_endpoint(tuning_predictor.endpoint_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Workflow Automation with the AWS Step Functions Data Science SDK \n", "\n", "In the previous parts of this notebook, we prototyped various steps of a TensorFlow project within the notebook itself. Notebooks are great for prototyping, but generally are not used in production-ready machine learning pipelines. For example, a simple pipeline in SageMaker includes the following steps: \n", "\n", "1. Training the model.\n", "2. Creating a SageMaker Model object that wraps the model artifact for serving.\n", "3. Creating a SageMaker Endpoint Configuration specifying how the model should be served (e.g. hardware type and amount).\n", "4. Deploying the trained model to the configured SageMaker Endpoint. \n", "\n", "The AWS Step Functions Data Science SDK automates the process of creating and running these kinds of workflows using AWS Step Functions and SageMaker. It does this by allowing you to create workflows using short, simple Python scripts that define workflow steps and chain them together. Under the hood, all the workflow steps are coordinated by AWS Step Functions without any need for you to manage the underlying infrastructure. \n", "\n", "To begin, install the Step Functions Data Science SDK: " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import sys\n", "\n", "!{sys.executable} -m pip install --quiet --upgrade stepfunctions" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Add an IAM policy to your SageMaker role \n", "\n", "**If you are running this notebook on an Amazon SageMaker notebook instance**, the IAM role assumed by your notebook instance needs permission to create and run workflows in AWS Step Functions. To provide this permission to the role, do the following.\n", "\n", "1. Open the Amazon [SageMaker console](https://console.aws.amazon.com/sagemaker/). \n", "2. Select **Notebook instances** and choose the name of your notebook instance\n", "3. Under **Permissions and encryption** select the role ARN to view the role on the IAM console\n", "4. Choose **Attach policies** and search for `AWSStepFunctionsFullAccess`.\n", "5. Select the check box next to `AWSStepFunctionsFullAccess` and choose **Attach policy**\n", "\n", "If you are running this notebook in a local environment, the SDK will use your configured AWS CLI configuration. For more information, see [Configuring the AWS CLI](https://docs.aws.amazon.com/cli/latest/userguide/cli-chap-configure.html).\n", "\n", "\n", "### Create an execution role for Step Functions \n", "\n", "You also need to create an execution role for Step Functions to enable that service to access SageMaker and other service functionality.\n", "\n", "1. Go to the [IAM console](https://console.aws.amazon.com/iam/)\n", "2. Select **Roles** and then **Create role**.\n", "3. Under **Choose the service that will use this role** select **Step Functions**\n", "4. Choose **Next** until you can enter a **Role name**\n", "5. Enter a name such as `StepFunctionsWorkflowExecutionRole` and then select **Create role**\n", "\n", "\n", "Select your newly create role and attach a policy to it. The following steps attach a policy that provides full access to Step Functions, however as a good practice you should only provide access to the resources you need. \n", "\n", "1. Under the **Permissions** tab, click **Add inline policy**\n", "2. Enter the following in the **JSON** tab\n", "\n", "```json\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"sagemaker:CreateTransformJob\",\n", " \"sagemaker:DescribeTransformJob\",\n", " \"sagemaker:StopTransformJob\",\n", " \"sagemaker:CreateTrainingJob\",\n", " \"sagemaker:DescribeTrainingJob\",\n", " \"sagemaker:StopTrainingJob\",\n", " \"sagemaker:CreateHyperParameterTuningJob\",\n", " \"sagemaker:DescribeHyperParameterTuningJob\",\n", " \"sagemaker:StopHyperParameterTuningJob\",\n", " \"sagemaker:CreateModel\",\n", " \"sagemaker:CreateEndpointConfig\",\n", " \"sagemaker:CreateEndpoint\",\n", " \"sagemaker:DeleteEndpointConfig\",\n", " \"sagemaker:DeleteEndpoint\",\n", " \"sagemaker:UpdateEndpoint\",\n", " \"sagemaker:ListTags\",\n", " \"lambda:InvokeFunction\",\n", " \"sqs:SendMessage\",\n", " \"sns:Publish\",\n", " \"ecs:RunTask\",\n", " \"ecs:StopTask\",\n", " \"ecs:DescribeTasks\",\n", " \"dynamodb:GetItem\",\n", " \"dynamodb:PutItem\",\n", " \"dynamodb:UpdateItem\",\n", " \"dynamodb:DeleteItem\",\n", " \"batch:SubmitJob\",\n", " \"batch:DescribeJobs\",\n", " \"batch:TerminateJob\",\n", " \"glue:StartJobRun\",\n", " \"glue:GetJobRun\",\n", " \"glue:GetJobRuns\",\n", " \"glue:BatchStopJobRun\"\n", " ],\n", " \"Resource\": \"*\"\n", " },\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"iam:PassRole\"\n", " ],\n", " \"Resource\": \"*\",\n", " \"Condition\": {\n", " \"StringEquals\": {\n", " \"iam:PassedToService\": \"sagemaker.amazonaws.com\"\n", " }\n", " }\n", " },\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"events:PutTargets\",\n", " \"events:PutRule\",\n", " \"events:DescribeRule\"\n", " ],\n", " \"Resource\": [\n", " \"arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule\",\n", " \"arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule\",\n", " \"arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTuningJobsRule\",\n", " \"arn:aws:events:*:*:rule/StepFunctionsGetEventsForECSTaskRule\",\n", " \"arn:aws:events:*:*:rule/StepFunctionsGetEventsForBatchJobsRule\"\n", " ]\n", " }\n", " ]\n", "}\n", "```\n", "\n", "3. Choose **Review policy** and give the policy a name such as `StepFunctionsWorkflowExecutionPolicy`\n", "4. Choose **Create policy**. You will be redirected to the details page for the role.\n", "5. Copy the **Role ARN** at the top of the **Summary**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set up a TrainingPipeline \n", "\n", "Although the AWS Step Functions Data Science SDK provides various primitives to build up pipelines from scratch, it also provides prebuilt templates for common workflows, including a [TrainingPipeline](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/pipelines.html#stepfunctions.template.pipeline.train.TrainingPipeline) object to simplify creation of a basic pipeline that includes model training and deployment. \n", "\n", "The following code cell configures a `pipeline` object with the necessary parameters to define such a simple pipeline:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import stepfunctions\n", "\n", "from stepfunctions.template.pipeline import TrainingPipeline\n", "\n", "# paste the StepFunctionsWorkflowExecutionRole ARN from above\n", "workflow_execution_role = \"\"\n", "\n", "pipeline = TrainingPipeline(\n", " estimator=estimator,\n", " role=workflow_execution_role,\n", " inputs=inputs,\n", " s3_bucket=bucket\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualizing the workflow \n", "\n", "You can now view the workflow definition, and visualize it as a graph. This workflow and graph represent your training pipeline from starting a training job to deploying the model." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(pipeline.workflow.definition.to_json(pretty=True))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.render_graph()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Creating and executing the pipeline \n", "\n", "Before the workflow can be run for the first time, the pipeline must be created using the `create` method:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.create()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now the workflow can be started by invoking the pipeline's `execute` method:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution = pipeline.execute()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Use the `list_executions` method to list all executions for the workflow you created, including the one we just started. After a pipeline is created, it can be executed as many times as needed, for example on a schedule for retraining on new data. (For purposes of this notebook just execute the workflow one time to save resources.) The output will include a list you can click through to access a view of the execution in the AWS Step Functions console." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pipeline.workflow.list_executions(html=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "While the workflow is running, you can check workflow progress inside this notebook with the `render_progress` method. This generates a snapshot of the current state of your workflow as it executes. This is a static image. Run the cell again to check progress while the workflow is running." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.render_progress()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### BEFORE proceeding with the rest of the notebook:\n", "\n", "Wait until the workflow completes with status **Succeeded**, which will take a few minutes. You can check status with `render_progress` above, or open in a new browser tab the **Inspect in AWS Step Functions** link in the cell output. \n", "\n", "To view the details of the completed workflow execution, from model training through deployment, use the `list_events` method, which lists all events in the workflow execution." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "execution.list_events(reverse_order=True, html=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From this list of events, we can extract the name of the endpoint that was set up by the workflow. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import re\n", "\n", "endpoint_name_suffix = re.search('endpoint\\Wtraining\\Wpipeline\\W([a-zA-Z0-9\\W]+?)\"', str(execution.list_events())).group(1)\n", "print(endpoint_name_suffix)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once we have the endpoint name, we can use it to instantiate a TensorFlowPredictor object that wraps the endpoint. This TensorFlowPredictor can be used to make predictions, as shown in the following code cell. \n", "\n", "#### BEFORE running the following code cell:\n", "\n", "Go to the [SageMaker console](https://console.aws.amazon.com/sagemaker/), click **Endpoints** in the left panel, and make sure that the endpoint status is **InService**. If the status is **Creating**, wait until it changes, which may take several minutes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.tensorflow import TensorFlowPredictor\n", "\n", "workflow_predictor = TensorFlowPredictor('training-pipeline-' + endpoint_name_suffix)\n", "\n", "results = workflow_predictor.predict(x_test[:10])['predictions'] \n", "flat_list = [float('%.1f'%(item)) for sublist in results for item in sublist]\n", "print('predictions: \\t{}'.format(np.array(flat_list)))\n", "print('target values: \\t{}'.format(y_test[:10].round(decimals=1)))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using the AWS Step Functions Data Science SDK, there are many other workflows you can create to automate your machine learning tasks. For example, you could create a workflow to automate model retraining on a periodic basis. Such a workflow could include a test of model quality after training, with subsequent branches for failing (no model deployment) and passing the quality test (model is deployed). Other possible workflow steps include Automatic Model Tuning, data preprocessing with AWS Glue, and more. \n", "\n", "For a detailed example of a retraining workflow, see the AWS ML Blog post [Automating model retraining and deployment using the AWS Step Functions Data Science SDK for Amazon SageMaker](https://aws.amazon.com/blogs/machine-learning/automating-model-retraining-and-deployment-using-the-aws-step-functions-data-science-sdk-for-amazon-sagemaker/)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Cleanup \n", "\n", "The workflow we created above deployed a model to an endpoint. To avoid billing charges for an unused endpoint, you can delete it using the SageMaker console. To do so, go to the [SageMaker console](https://console.aws.amazon.com/sagemaker/). Then click **Endpoints** in the left panel, and select and delete any unneeded endpoints in the list. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Extensions \n", "\n", "We've covered a lot of content in this notebook: SageMaker Processing for data transformation, Local Mode for prototyping training and inference code, Automatic Model Tuning, and SageMaker hosted training and inference. These are central elements for most deep learning workflows in SageMaker. Additionally, we examined how the AWS Step Functions Data Science SDK helps automate deep learning workflows after completion of the prototyping phase of a project.\n", "\n", "Besides all of the SageMaker features explored above, there are many other features that may be applicable to your project. For example, to handle common problems during deep learning model training such as vanishing or exploding gradients, **SageMaker Debugger** is useful. To manage common problems such as data drift after a model is in production, **SageMaker Model Monitor** can be applied." ] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow_p36", "language": "python", "name": "conda_tensorflow_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 2 }