## Prerequisites

To run this notebook in an AWS Sagemaker Notebook, use the `conda_tensorflow2_p36` Python kernel. 
This kernel is set as default
and may take a minute to start. Also a note of caution on running all cells at once 
through the `Cell->Run All` menu option: At the end of this notebook in the Clean Up section, the resources
created are deleted through API calls, there are some parameters that need to be set like `my_email_address`, 
and some steps are optional. We recommend running through the notebook step by step. 

We need a few installations to allow our notebook to run Sagemaker containers in local mode, and step functions. 

In [None]:
# Enabling local mode training
!wget -q https://raw.githubusercontent.com/aws-samples/amazon-sagemaker-script-mode/master/local_mode_setup.sh
!wget -q https://raw.githubusercontent.com/aws-samples/amazon-sagemaker-script-mode/master/daemon.json 
!/bin/bash ./local_mode_setup.sh

In [None]:
import sys

# Upgrade to sagemaker version 2 to use Sagemaker Debugger built-in actions
!{sys.executable} -m pip install sagemaker -U

# Install stepfunctions python sdk -- we need the pre-release 2.0.0rc1 version since the current stable version 
# of stepfunctions SDK does not support Sagemaker 2.0 and above.
!{sys.executable} -m pip install stepfunctions==2.0.0rc1 -U

In [None]:
import numpy as np
import os
import sagemaker
from sagemaker.debugger import Rule, rule_configs, CollectionConfig, DebuggerHookConfig
from sagemaker.tensorflow import TensorFlow
import boto3

from stepfunctions.inputs import ExecutionInput
import stepfunctions as sf

import tensorflow as tf
from os.path import join as pjoin
import glob
import zipfile

In [None]:
# Define runtime information 
sess = sagemaker.Session()

region = sess.boto_region_name
sm_client = sess.boto_session.client('sagemaker')
cft_client = boto3.client('cloudformation')
s3 = sess.boto_session.client('s3')
account = boto3.client('sts').get_caller_identity().get('Account')
bucket = f'{account}-sagemaker-debugger-model-automation' 

# >>> provide email address for SNS topic subscription
my_email_address = ''

# sagemaker job params
train_instance_type = 'ml.m5.xlarge'
job_name_prefix = 'complex-resnet-model'

In [None]:
# create our bucket 
if region == 'us-east-1':
 # 'us-east-1' is the default and we should not specify the region
 s3.create_bucket(Bucket=bucket)
else:
 s3.create_bucket(Bucket=bucket, CreateBucketConfiguration={'LocationConstraint': region})

print(bucket)
print(region)

### IAM roles

We need a role for our Lambda functions, and another for our step functions workflow. Note that the IAM role attached 
to this notebook needs to have `iam:PassRole` permission for the two roles below to be able to attach them to 
resources created later. The `lambda_role` role below needs permissions to create and describe Sagemaker jobs, 
and publish to SNS topics. The `sf_role` below needs permission to invoke Lambda functions.

In [None]:
lambda_role = f'arn:aws:iam::{account}:role/lambda-sagemaker-train'
sf_role = f'arn:aws:iam::{account}:role/step-function-basic-role'

### Container images

We will use pre-built container images for Tensorflow and Sagemaker Debugger. The following retrieves the docker
URIs to use when creating our training job instances. 

In [None]:
sm_tensorflow_image = sagemaker.image_uris.retrieve(framework="tensorflow", 
 image_scope='training',
 version='2.2', 
 region=region,
 instance_type=train_instance_type,
 py_version='py37'
 )
sm_debugger_image = sagemaker.image_uris.retrieve(framework="debugger", region=region)
print(sm_tensorflow_image)
print(sm_debugger_image)

## Setting up notifications using AWS Simple Notification Service (SNS)
Using boto3 API, we create an SNS topic and add a subscription using any of the supported protocols, like email or 
SMS. Here we will use an email address for the topic subscription. This step
can also be done through [AWS SNS console](https://console.aws.amazon.com/sns/home). Note that the topic name 
`SMDebugRules` is what Sagemaker Debugger built-in 
[notification action](https://docs.aws.amazon.com/sagemaker/latest/dg/debugger-built-in-actions.html) looks for to 
publish messages to.

In [None]:
sns = boto3.client('sns')
topicresponse = sns.create_topic(Name='SMDebugRules')
topic_arn=topicresponse['TopicArn']
sns.subscribe(TopicArn=topic_arn, Protocol='email', Endpoint=my_email_address)
print(topic_arn)

**Before proceeding, make sure to confirm your subscription by following the instructions SNS sends to your email address.
If you do not confirm your 
subscription, the workflow will not complain or produce an error, but you will not receive the notifications it 
publishes.**

## Model

The model's python code can be found in the same repository as this notebook [here](code/model/model.py). The model can be configured to have `ELU` or `ReLU` activation, add a batch normalization layer before additions, or change warmup learning rate and the initial learning rate post warm-up period.

We pack our model and upload to S3 below. We will use the default bucket allocated to this Sagemaker notebook, which can be found under the above bucket name.

In [None]:
!tar -cvzf sourcedir.tar.gz -C code/model/ train.py model.py
s3.upload_file('sourcedir.tar.gz', bucket, 'source/sourcedir.tar.gz')

## Dataset

We will use the standard [CIFAR10 dataset](https://www.cs.toronto.edu/~kriz/cifar.html). We download the dataset, then upload it to S3.

In [None]:
def load_data():
 (x_train, y_train), (x_test, y_test) = tf.keras.datasets.cifar10.load_data()
 x_train = x_train.astype('float32') / 256
 x_test = x_test.astype('float32') / 256

 # Convert class vectors to binary class matrices.
 y_train = tf.keras.utils.to_categorical(y_train, num_classes=10)
 y_test = tf.keras.utils.to_categorical(y_test, num_classes=10)
 return ((x_train, y_train), (x_test, y_test))

(x_train, y_train), (x_test, y_test) = load_data()

data_dir = os.path.join(os.getcwd(), 'data')
train_dir = os.path.join(data_dir, 'train')
test_dir = os.path.join(data_dir, 'test')
os.makedirs(train_dir, exist_ok=True)
os.makedirs(test_dir, exist_ok=True)

np.save(pjoin(train_dir, 'x.npy'), x_train)
np.save(pjoin(train_dir, 'y.npy'), y_train)
np.save(pjoin(test_dir, 'x.npy'), x_test)
np.save(pjoin(test_dir, 'y.npy'), y_test)
print('Data saved locally:')
print(train_dir) 
print(test_dir)


# Uploads to default session bucket
train_s3 = sess.upload_data(train_dir, bucket=bucket, key_prefix='data/train')
test_s3 = sess.upload_data(test_dir, bucket=bucket, key_prefix='data/test')
print('Data upload to s3:')
print(train_s3)
print(test_s3)

## Local model training (optional)

Local mode training allows to run the training container in the same Jupyter notebook environment, which saves 
the startup time and reduces testing overheads. The following step is not strictly necessary to build the workflow. 
However, it is useful to see how we can test and develop our model template locally before plugging in the workflow. 
This saves us development time by allowing isolated testing of the model.

We have set the `num_epochs` parameter to only 2 epochs to finish the job quickly. The step should take 
less than 5 minutes to complete. You can also skip to the next section. 

In [None]:
hyperparameters = {'warmup_learning_rate': 0.001, 
 'learning_rate': 0.1, 
 'num_epochs': 2, 
 'add_batch_norm': 0 # set to 0 to disable, 1 to enable batch_norm before add in res blocks
 }

# Build the local estimator
local_estimator = TensorFlow(source_dir='code/model',
 entry_point='train.py',
 instance_type='local',
 instance_count=1,
 hyperparameters=hyperparameters,
 role=lambda_role,
 base_job_name=job_name_prefix,
 framework_version='2.2',
 py_version='py37',
 metric_definitions = [{'Name': 'loss',
 'Regex': 'loss: (.*?) '}], 
 script_mode=True)

In [None]:
# file:// allows to feed local files to the local container for training.
inputs = {'train': f'file://{train_dir}',
 'test': f'file://{test_dir}'}

local_estimator.fit(inputs)

### Training on Sagemaker cluster with Sagemaker Debugger (optional)

Once we are satisfied with the model, we configure Sagemaker debugger, and perform a test launch on Sagemaker clusters. 
Note that we cannot run Sagemaker Debugger in local mode, but the model code has a checkpoint to identify if the
 model is being run in local mode, in which case it will not call Sagemaker Debugger callbacks. 

Here, we are concerned about exploding gradients. 
See [Sagemaker Debugger examples and documents](https://github.com/awslabs/sagemaker-debugger/blob/master/docs/sagemaker.md) 
for other rules Debugger can check.

### Sagemaker Debugger actions
[ Debugger Actions](https://docs.aws.amazon.com/sagemaker/latest/dg/debugger-action-on-rules.html) is a new feature 
that allows to perform an action in response to a debugger rule firing. Here, we will use the built-in _stop training_ 
action to stop the training job if the training experiences exploding gradients. The built-in _email_ action is also 
useful to notify us should the debugger rule fire. 

Built-in actions are useful mechanisms to perform a single action, for example sending a notification, upon a rule firing.
For more complex reactions to training problems, the next section will incorporate debugger rules within a step function 
workflow that can iteratively tweak the model while tracking the hisotry and sending detailed notification messages that
include the history and current status of the model training.

Although we have set `num_epochs` to 30 epochs, Debugger rule is expected to fire and trigger the built-in `StopTraining`
action to stop the training job as soon as Debugger detects convergence problems. A notification will also be sent.
This step should take about 10-15 minutes to complete. 

In [None]:
# Setting built-in actions to stop bad training job and notify if debugger rule is triggered.
debugger_actions = rule_configs.ActionList( 
 rule_configs.StopTraining(),
 rule_configs.Email(my_email_address)
)

# Adding exploding tensor rule to look for exploding gradient issues. 
dbg_rules = [
 Rule.sagemaker(
 base_config=rule_configs.exploding_tensor(),
 rule_parameters={
 "tensor_regex": ".*gradient",
 "only_nan": "False"
 },
 actions=debugger_actions
 )
]

hook_config = DebuggerHookConfig(
 hook_parameters={
 "save_interval": "100"
 },
 collection_configs=[ 
 CollectionConfig(
 name="gradients",
 parameters={
 "save_interval": "100",
 }
 )
 ]
)

hyperparameters = {'warmup_learning_rate': 0.1, 
 'learning_rate': 0.1, 
 'num_epochs': 30, 
 'add_batch_norm': 0 # set to 0 to disable, 1 to enable batch_norm before add in res blocks
 }

estimator = TensorFlow(
 script_mode=True,
 source_dir='code/model',
 entry_point='train.py',
 instance_type=train_instance_type,
 instance_count=1,
 hyperparameters=hyperparameters,
 role=sagemaker.get_execution_role(),
 base_job_name=job_name_prefix,
 framework_version='2.2',
 py_version='py37', 
 rules = dbg_rules,
 debugger_hook_config = hook_config,
 metric_definitions = [{'Name': 'loss',
 'Regex': 'loss: (.+?) '}], 
 output_path=f's3://{bucket}'
)

inputs = {'train': train_s3,
 'test': test_s3}

# wait=True, logs=True will show us all the job logs, which may be convenient for testing.
# We can also view the same logs in CloudWatch by following the link to logs on Sagemaker
# training jobs page of our job.
estimator.fit(inputs, wait=True, logs=True)

## Monitoring job status

We can monitor the status of our training job through [Sagemaker console](https://console.aws.amazon.com/sagemaker/home), 
or by using the Sagemaker API without leaving our notebook as below.

In [None]:
estimator.latest_training_job.describe()

## AWS Lambda functions

We use two AWS Lambda functions for the workflow. One is used to create a training job. The other monitors the status of the
job and Debugger rule checks, publishes notifications to SNS, and stops bad training jobs and plans next training configuration.

### First time creating
The following piece of code zips our functions and creates Lambda functions.

In [None]:
lambda_client = boto3.client('lambda')
# We will pack all python files under the lambda folder
files = glob.glob('code/lambda/*.py')
print(files)
# Collect the file names, which we will use later when cleaning up.
lambda_funcs = []
for f in files:
 zipname = f.split('.')[0]+'.zip'
 func_name = os.path.basename(f.split('.')[0])
 lambda_funcs.append(func_name)
 zf = zipfile.ZipFile(zipname, mode='w')
 zf.write(f, arcname=func_name + '.py')
 zf.close()
 s3.upload_file(zipname, Bucket=bucket, Key=f'source/{zipname}')
 
 response = lambda_client.create_function(
 FunctionName=func_name,
 Runtime='python3.7',
 Role=lambda_role,
 Handler=f'{func_name}.lambda_handler',
 Code={
 'S3Bucket': bucket,
 'S3Key': f'source/{zipname}'
 },
 Description='Queries a SageMaker training job and return the results.',
 Timeout=15,
 MemorySize=128
 )
 print(f"Creating {func_name}, API response status: {response['ResponseMetadata']['HTTPStatusCode']}")

### Updating functions code if needed (Optional)
As we work with developing our workflow, often need to update the lambda functions. This snippet allows us to update
the function codes without leaving our notebook enviroment or deleting and recreating our Lambda functions. We can also 
edit the code in [AWS Lambda editor](https://console.aws.amazon.com/lambda/home).

In [None]:
lambda_client = boto3.client('lambda')
files = glob.glob('code/lambda/*.py')
fileszip = []
for f in files:
 zipname = f.split('.')[0]+'.zip'
 print(zipname)
 fileszip.append(zipname)
 func_name = os.path.basename(f.split('.')[0])
 zf = zipfile.ZipFile(zipname, mode='w')
 zf.write(f, arcname = func_name + '.py')
 zf.close()
 s3.upload_file(zipname, Bucket=bucket, Key=f'source/{zipname}')
 response = lambda_client.update_function_code(
 FunctionName=func_name,
 S3Bucket = bucket,
 S3Key = f'source/{zipname}',
 Publish = False
 )
 print(response['ResponseMetadata']['HTTPStatusCode'])

## AWS Step Function workflow

We define a number of input configuration parameters for our workflow.
The workflow provides a few checkpoints to avoid runaway situations. For example, the `max_monitor_transitions` ensures 
that the state machine does not loop indefinitely getting stuck in the monitor state. Set `max_monitor_transitions` 
to a high, but reasonable, value in accordance to the expected run time of the state machine and also 
the `wait_time` parameters which controls how much the state machine 
waits before querying the status of the job again. 

In [None]:
execution_input = ExecutionInput(schema={
 'init_warmup_learning_rate': float,
 'learning_rate': float,
 'add_batch_norm': int,
 'bucket': str,
 'base_job_name': str,
 'instance_type': str,
 'region': str,
 'num_epochs': int,
 'debugger_save_interval': int,
 'max_num_retraining': int,
 'max_monitor_transitions': int
 }
)
# wait_time sepcifies how many seconds the monitor step waits before querying about the status of the training job. 
# We are setting to wait for 5 minutes in between checking the status
wait_time = 5*60

### Defining the state machine

In [None]:
# Create the initial state that steps pass to each other. 
init_state = sf.steps.states.Pass('init', 
 parameters={
 'state': {
 'history': {
 'num_warmup_adjustments': 0,
 'num_batch_layer_adjustments': 0,
 'num_retraining': 0,
 'latest_job_name': '',
 'num_learning_rate_adjustments': 0,
 'num_monitor_transitions': 0
 },
 'next_action': 'launch_new', # planning a new launch next
 'job_status': '',
 'run_spec': {
 'warmup_learning_rate': execution_input['init_warmup_learning_rate'],
 'learning_rate': execution_input['learning_rate'],
 'add_batch_norm': execution_input['add_batch_norm'],
 'bucket': execution_input['bucket'],
 'base_job_name': execution_input['base_job_name'],
 'instance_type': execution_input['instance_type'],
 'region': execution_input['region'],
 'sm_role': lambda_role,
 'num_epochs': execution_input['num_epochs'],
 'debugger_save_interval': execution_input['debugger_save_interval']
 }
 }
 },
 )
 

training_step = sf.steps.compute.LambdaStep('train',
 parameters={
 'FunctionName': 'launch_training_job',
 'Payload': {
 'state.$': '$.state',
 'sm_tensorflow_image': sm_tensorflow_image,
 'sm_debugger_image': sm_debugger_image
 }
 }, 
 output_path='$.Payload.body', 
 wait_for_callback=False
 )

wait_step = sf.steps.states.Wait('wait', 
 seconds=wait_time, 
 comment='Wait for training job to make some progress.')

monitor_step = sf.steps.compute.LambdaStep('monitor',
 parameters={
 'FunctionName': 'monitor',
 'Payload': {
 'topic_arn': topic_arn,
 'max_num_retraining': execution_input['max_num_retraining'],
 'max_monitor_transitions': execution_input['max_monitor_transitions'],
 'state.$': '$.state'
 }
 },
 output_path='$.Payload.body'
 )
choice_step = sf.steps.states.Choice('choice')
# possible values for next_action: 'launch_new', 'end', 'monitor'

succeed_step = sf.steps.states.Succeed('succeed')

choice_step.add_choice(
 rule=sf.steps.choice_rule.ChoiceRule.StringEquals(variable='$.state.next_action', value='launch_new'),
 next_step=training_step
)
choice_step.add_choice(
 rule=sf.steps.choice_rule.ChoiceRule.StringEquals(variable='$.state.next_action', value='monitor'),
 next_step=wait_step
)
choice_step.add_choice(
 rule=sf.steps.choice_rule.ChoiceRule.StringEquals(variable='$.state.next_action', value='end'),
 next_step=succeed_step
)

In [None]:
workflow_definition = sf.steps.Chain([init_state, training_step, wait_step, monitor_step, choice_step])
workflow = sf.workflow.Workflow('sagemaker-model-dev-workflow-with-debugger', definition=workflow_definition, role=sf_role)

In [None]:
print(workflow.definition.to_json(pretty=True))
workflow.render_graph(portrait=False)

In [None]:
workflow.create()
# Calling update allows to update workflow definition of an existing workflow. 
# Note that if workflow already exists, workflow.create() does not complain
workflow.update(workflow_definition)

We have set `"num_epochs": 5` to finish faster (likely around 30 minutes or some more). If you are interested to see 
the full workflow optimizing the model architecture, try `"num_epochs": 30`, which should finish in about 3-4 hours.

In [None]:
workflow.execute(inputs={'init_warmup_learning_rate': 0.1,
 'num_epochs': 5,
 'learning_rate': 0.1,
 'add_batch_norm': 0,
 'bucket': bucket,
 'base_job_name': job_name_prefix,
 'instance_type': 'ml.m5.xlarge',
 'region': region,
 'debugger_save_interval': 100,
 'max_num_retraining': 30,
 'max_monitor_transitions': 200
 })

## Clean up
Finally, we can delete the resources we have created above. 

In [None]:
# delete SNS topic
sns.delete_topic(TopicArn=topic_arn)

In [None]:
# delete Step Functions workflow
workflow.delete()

In [None]:
for lambda_f in lambda_funcs:
 lambda_client.delete_function(FunctionName=lambda_f)

In [None]:
# Empty the bucket
s3service = boto3.resource('s3')
bucket_obj = s3service.Bucket(bucket)
# This step may take a few minutes.
bucket_obj.objects.all().delete()

In [None]:
# WARNING: This will delete the working bucket. To rerun the notebook, you would have to provide a new bucket
bucket_obj.delete()

In [None]:
# Delete the CloudFormation stack.
# WARNING: THIS WILL DELETE THIS NOTEBOOK AND ANY CODE CHANGES.
cft_client.delete_stack(StackName='debugger-cft-stack')