# Manage AutoML Workflows with AWS StepFunctions and AutoGluon on Amazon SageMaker

This notebook provides a tutorial on how to run ML experiments using AWS StepFunctions.
The state machine is able to execute different workloads based on its runtime input parameters.

We provide here a subset of the most common use-cases:

1) [Train and evaluate a ML model](#train-evaluate)

2) [Run batch predictions with pre-trained AutoGluon Model](#pretrained-batch)

3) [Train and deploy model to SageMaker Endpoint](#train-endpoint)

NB:
- Please select `conda_python3` as Notebook Kernel.
- Please consider using `Jupyter` over `Jupyter Lab` to avoid potential visualizatoin issues with `stepfunctions` library

## Configure Environment

Let's start with install AWS StepFunctions Python SDK

In [40]:
!pip install -q stepfunctions==2.2.0

Import libraries

In [1]:
from stepfunctions.workflow import Workflow
from stepfunctions.inputs import ExecutionInput
import json
from time import gmtime, strftime, sleep
from IPython.display import display, clear_output
import pandas as pd
from sklearn.model_selection import train_test_split
import sagemaker
import boto3
import os

INPUT_PARAMS_DIR = "./input/"
DATA_DIR = "./data/"
PREFIX = "automl-data"

session = sagemaker.Session()
sagemaker_bucket = session.default_bucket()

### Download sample data

__NB: Replace data s3 paths and jump to next section if you would like to use a custom dataset__

Let's download data

In [158]:
!aws s3 cp s3://sagemaker-sample-files/datasets/tabular/synthetic/churn.txt {DATA_DIR}/churn.csv

download: s3://sagemaker-sample-files/datasets/tabular/synthetic/churn.txt to data/churn.csv


In [55]:
df = pd.read_csv(f"{DATA_DIR}/churn.csv")

In [36]:
[feature for feature in df.columns if 'A' in feature]

['Account Length', 'Area Code']

Holdout split in train/test

In [56]:
train, test = train_test_split(df, test_size=.2)

Save file locally before upload

In [57]:
train.to_csv(f"{DATA_DIR}/train.csv", index=False)
test.to_csv(f"{DATA_DIR}/test.csv", index=False)
test.drop('Churn?', axis=1).to_csv(f"{DATA_DIR}/test_batch.csv", index=False, header=False)

Upload files to S3 for training

In [58]:
boto3.Session().resource('s3').Bucket(sagemaker_bucket).Object(os.path.join(PREFIX, 'train.csv')).upload_file(f"{DATA_DIR}/train.csv")
boto3.Session().resource('s3').Bucket(sagemaker_bucket).Object(os.path.join(PREFIX, 'test.csv')).upload_file(f"{DATA_DIR}/test.csv")
boto3.Session().resource('s3').Bucket(sagemaker_bucket).Object(os.path.join(PREFIX, 'test_batch.csv')).upload_file(f"{DATA_DIR}/test_batch.csv")

train_uri = f"s3://{sagemaker_bucket}/{PREFIX}/train.csv"
test_uri = f"s3://{sagemaker_bucket}/{PREFIX}/test.csv"
test_batch_uri = f"s3://{sagemaker_bucket}/{PREFIX}/test_batch.csv"
model_output_prefix = f"s3://{sagemaker_bucket}/{PREFIX}/output/"

Define resource ARNs 

__TODO find a way to retrieve ARNs automatically (maybe with Parameter Store)__

In [179]:
main_machine_arn = "arn:aws:states:eu-west-1:039573824519:stateMachine:MainStateMachineD8FB90C3-GcOHBmyXA0SP"
train_machine_arn = "arn:aws:states:eu-west-1:039573824519:stateMachine:TrainStateMachineAA65CDDB-ovlcReYQjVFQ"
deploy_machine_arn = "arn:aws:states:eu-west-1:039573824519:stateMachine:DeployStateMachine357A3963-KbeWPmnhskxz"

Attach SDK to state machines

In [180]:
main_workflow = Workflow.attach(main_machine_arn)
train_workflow = Workflow.attach(train_machine_arn)
deploy_workflow = Workflow.attach(deploy_machine_arn)

### Main State Machine

This state machine is in charge of orchestrating the execution and kickstaring both `Train` and `Deploy` state machines if required.

It includes:
- Training AutoGluon model (through external state machine)
- Evaluating trained model
- Deploying model to SageMaker Endpoint or executes SageMaker Batch Transform Job (through external state machine)

In [4]:
main_workflow

In [5]:
main_workflow.render_graph()

### Train State Machine

This state machine performs to main tasks:

1) Starts a new SageMaker Training Job using AutoGluon tabular

2) Save trained model to SageMaker Model Registry

In [6]:
train_workflow

In [7]:
train_workflow.render_graph()

### Deploy State Machine

This state machine performs:

- Online deployment, by spinning up a new SageMaker Endpoint and waiting for it to became available

- Offline deployment, by running a SageMaker Batch Transform Job and waiting for its completion

In [8]:
deploy_workflow

In [9]:
deploy_workflow.render_graph()

<a id='train-evaluate'></a>
## 1) Train and Evaluate ML Model

Sample execution that consists of __training__ a new AutoGluon model and __evaluating__ its perfomance on a fresh test set.

Let's start by loading the input parameters:

In [37]:
input_params_path = f"{INPUT_PARAMS_DIR}train_evaluate_input.json"
with open(input_params_path, "r") as input_file:
    input_params = json.load(input_file)

Replace data placeholder with train and test s3 data uri

In [38]:
input_params['Parameters']['Train']['TrainDataPath'] = train_uri
input_params['Parameters']['Train']['TestDataPath'] = test_uri
input_params['Parameters']['Train']["TrainingOutput"] = model_output_prefix

Set target label and [problem type (options: `binary`, `multiclass`, `regression`, `quantile`)](https://auto.gluon.ai/stable/api/autogluon.task.html#autogluon.tabular.TabularPredictor)

In [39]:
# Double JSON encoding is needed to successfully forward parameters from State Machine to SageMaker Training Job

ag_init_params = json.loads(json.loads(input_params['Parameters']['Train']["InitArgs"]))
ag_init_params['problem_type'] = 'binary'
ag_init_params['label'] = 'Churn?'

ag_fit_params = json.loads(json.loads(input_params['Parameters']['Train']['FitArgs']))
ag_fit_params['presets'] = 'medium_quality_faster_train'

input_params['Parameters']['Train']["InitArgs"] = json.dumps(json.dumps(ag_init_params))

Let's explore the parameters structure

In [40]:
print(f"Parameters sections: {list(input_params['Parameters'].keys())}")

Parameters sections: ['Flow', 'Train', 'Evaluation']


If we inspect the `Flow` section, we can see which stages of the pipeline have been selected.

In [41]:
print('Selected steps:', [k for k, v in input_params['Parameters']['Flow'].items() if v])
print('Unselected steps:', [k for k, v in input_params['Parameters']['Flow'].items() if not v])

Selected steps: ['Train', 'Evaluate']
Unselected steps: ['Deploy']


In [42]:
input_params

{'Parameters': {'Flow': {'Train': True, 'Evaluate': True, 'Deploy': False},
  'Train': {'TrainDataPath': 's3://sagemaker-eu-west-1-039573824519/automl-data/train.csv',
   'TestDataPath': 's3://sagemaker-eu-west-1-039573824519/automl-data/test.csv',
   'TrainingOutput': 's3://stepfunctions-mlops/output/training/',
   'InstanceCount': 1,
   'InstanceType': 'ml.m5.2xlarge',
   'FitArgs': '"{\\"presets\\": \\"medium_quality_faster_train\\"}"',
   'InitArgs': '"{\\"label\\": \\"Churn?\\", \\"problem_type\\": \\"binary\\"}"',
   'FeatureImportance': 'true',
   'Leaderboard': 'true'},
  'Evaluation': {'Threshold': '0.6', 'Metric': 'accuracy'}}}

Let's start the execution and wait for completion

In [124]:
train_execution_name = f"train-evaluate-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"
train_execution = main_workflow.execute(name=train_execution_name, inputs=input_params)

In [44]:
status = train_execution.describe()['status']
while status == 'RUNNING':
    clear_output(wait=True)
    display(train_execution.render_progress())
    sleep(60)
    status = train_execution.describe()['status']

clear_output(wait=True)
display(train_execution.render_progress())
print(f"Execution terminated, status: {status}")

Execution terminated, status: SUCCEEDED


Let's explore the execution output.

Here's some snippet from Train state machine output:

In [45]:
training_output = train_execution.get_output()['TrainStepsOutput']['Output']['TrainingOutput']

print(f"Training Job Name: {training_output['TrainingJobName']}")
print(f"Model artifacts: {training_output['ModelArtifacts']['S3ModelArtifacts']}")

Training Job Name: ef208ec4-04fd-45af-bb57-885d618daee1
Model artifacts: s3://stepfunctions-mlops/output/training/ef208ec4-04fd-45af-bb57-885d618daee1/output/model.tar.gz


Here's instead we have the evaluation step output

In [46]:
evaluation_output = train_execution.get_output()['ModelValidationLambdaOutput']['Payload']
print(f"Has model outperfomed the threshold? {evaluation_output['IsValid']}")
print("\nScores")
for metric, value in evaluation_output['Scores'].items():
      print(f"\t{metric}: {value}")

Has model outperfomed the threshold? True

Scores
	accuracy: 0.963
	balanced_accuracy: 0.9624517803708555
	mcc: 0.9261342082184997
	roc_auc: 0.9893606367594807
	f1: 0.9647954329210275
	precision: 0.9530075187969925
	recall: 0.976878612716763


<a id='pretrained-batch'></a>
## 2) Run batch predictions with pre-trained AutoGluon Model

Load an already trained model and run batch inference on new set.

Let's load a new set of input parameters:

In [47]:
input_params_path = f"{INPUT_PARAMS_DIR}pretrained_batch_input.json"
with open(input_params_path, "r") as input_file:
    input_params = json.load(input_file)

Let's explore the parameters section

In [48]:
print(f"Parameters sections: {list(input_params['Parameters'].keys())}")

Parameters sections: ['Flow', 'PretrainedModel', 'Deploy']


Fill the value of `PretrainedModel` with model trained at the previous step

In [49]:
input_params['Parameters']['PretrainedModel']['Name'] = train_execution.get_output()['TrainStepsOutput']['Output']['TrainingOutput']['TrainingJobName']
input_params['Parameters']['Deploy']['BatchOutputDataPath'] = f"s3://{sagemaker_bucket}/batch-out/"
input_params['Parameters']['Deploy']['BatchInputDataPath'] = test_batch_uri

In [51]:
print('Selected steps:', [k for k, v in input_params['Parameters']['Flow'].items() if v])
print('Unselected steps:', [k for k, v in input_params['Parameters']['Flow'].items() if not v])

Selected steps: ['Deploy']
Unselected steps: ['Train', 'Evaluate']


In [59]:
batch_execution_name = f"pretrained-batch-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"
batch_execution = main_workflow.execute(name=batch_execution_name, inputs=input_params)

In [None]:
status = batch_execution.describe()['status']
while status == 'RUNNING':
    clear_output(wait=True)
    display(batch_execution.render_progress())
    sleep(60)
    status = batch_execution.describe()['status']

clear_output(wait=True)
display(batch_execution.render_progress())
print(f"Execution terminated, status: {status}")

Let's explore brifly the predictions saved on S3

In [61]:
batch_output_uri = batch_execution.get_output()['Parameters']['Deploy']['BatchOutputDataPath']
batch_filename = 'batch_out.json'

In [62]:
!aws s3 cp {batch_output_uri}test_batch.csv.out {batch_filename}

download: s3://sagemaker-eu-west-1-039573824519/batch-out/test_batch.csv.out to ./batch_out.json


In [63]:
with open(batch_filename, "r") as input_file:
    batch_predictions = json.load(input_file)

In [64]:
batch_predictions[:10]

[['True.', 0.04348301887512207, 0.9565169811248779],
 ['True.', 0.009195685386657715, 0.9908043146133423],
 ['True.', 0.0015034675598144531, 0.9984965324401855],
 ['False.', 0.9803850054740906, 0.019614998251199722],
 ['True.', 0.018985986709594727, 0.9810140132904053],
 ['False.', 0.9465275406837463, 0.053472474217414856],
 ['True.', 0.06416893005371094, 0.9358310699462891],
 ['False.', 0.9860401749610901, 0.013959839008748531],
 ['False.', 0.9652785062789917, 0.03472146764397621],
 ['True.', 0.012667417526245117, 0.9873325824737549]]

Clean up prediction file from local memory

In [65]:
!rm {batch_filename}

<a id='train-endpoint'></a>
## 3) Train, evaluate and deploy a new AutoGluon model for online prediction

Let's bring everything together and train, evaluate and deploy model to SageMaker Endpoint.

Let's load a new set of input parameters:

In [181]:
input_params_path = f"{INPUT_PARAMS_DIR}train_endpoint_input.json"
with open(input_params_path, "r") as input_file:
    input_params = json.load(input_file)

Let's explore the parameters structure

In [182]:
print(f"Parameters sections: {list(input_params['Parameters'].keys())}")

Parameters sections: ['Flow', 'Train', 'Evaluation', 'Deploy']


Let's now replace placeholders in input parameters

In [183]:
input_params['Parameters']['Train']['TrainDataPath'] = train_uri
input_params['Parameters']['Train']['TestDataPath'] = test_uri
input_params['Parameters']['Train']["TrainingOutput"] = model_output_prefix

# Double JSON encoding is needed to successfully forward parameters from State Machine to SageMaker Training Job
ag_init_params = json.loads(json.loads(input_params['Parameters']['Train']["InitArgs"]))
ag_init_params['problem_type'] = 'binary'
ag_init_params['label'] = 'Churn?'

ag_fit_params = json.loads(json.loads(input_params['Parameters']['Train']['FitArgs']))
ag_fit_params['presets'] = 'optimize_for_deployment'

input_params['Parameters']['Train']["InitArgs"] = json.dumps(json.dumps(ag_init_params))

In [184]:
print('Selected steps:', [k for k, v in input_params['Parameters']['Flow'].items() if v])
print('Unselected steps:', [k for k, v in input_params['Parameters']['Flow'].items() if not v])

Selected steps: ['Train', 'Evaluate', 'Deploy']
Unselected steps: []


In [185]:
endpoint_execution_name = f"train-endpoint-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"
endpoint_execution = main_workflow.execute(name=endpoint_execution_name, inputs=input_params)

In [186]:
status = endpoint_execution.describe()['status']
while status == 'RUNNING':
    clear_output(wait=True)
    display(endpoint_execution.render_progress())
    sleep(60)
    status = endpoint_execution.describe()['status']

clear_output(wait=True)
display(endpoint_execution.render_progress())
print(f"Execution terminated, status: {status}")

Execution terminated, status: SUCCEEDED


Let's test the endpoint

In [187]:
endpoint_arn = endpoint_execution.get_output()['DeployStepsOutput']['Output']['CreateEndpointOutput']['EndpointArn']
endpoint_name = endpoint_arn.split('/')[-1]

In [196]:
sm_runtime = boto3.client('runtime.sagemaker')

# Loading a snippet of the test file
with open(f"{DATA_DIR}test_batch.csv", "r") as batch_file:
    batch_file.readline()
    samples = batch_file.readlines()[:10]
    
payload = {"instances": ''.join(samples)}

response = sm_runtime.invoke_endpoint(
    EndpointName=endpoint_name,
    ContentType='text/csv',
    Body=''.join(samples)
)

In [197]:
results = json.loads(response['Body'].read().decode())

Here's your live predictions

In [200]:
results

[['True.', 0.04431271553039551, 0.9556872844696045],
 ['True.', 0.02011626958847046, 0.9798837304115295],
 ['True.', 0.008425593376159668, 0.9915744066238403],
 ['False.', 0.9018359184265137, 0.09816406667232513],
 ['True.', 0.2541276216506958, 0.7458723783493042],
 ['True.', 0.42594635486602783, 0.5740536451339722],
 ['True.', 0.4489116668701172, 0.5510883331298828],
 ['False.', 0.9520736932754517, 0.047926317900419235],
 ['False.', 0.9109795093536377, 0.0890204906463623],
 ['True.', 0.05050283670425415, 0.9494971632957458]]

__Optional__: delete endpoint to avoid additional charges

In [202]:
sm_client = boto3.client('sagemaker')
sm_client.delete_endpoint(EndpointName=endpoint_name)

{'ResponseMetadata': {'RequestId': '60f01fdd-6656-4a2b-a79c-e19c5c109f81',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amzn-requestid': '60f01fdd-6656-4a2b-a79c-e19c5c109f81',
   'content-type': 'application/x-amz-json-1.1',
   'content-length': '0',
   'date': 'Wed, 26 Jan 2022 15:37:39 GMT'},
  'RetryAttempts': 0}}