# Workflow

The following notebook contains the step functions workflow definition for training and baseline jobs.

This can be run after you have started the [mlops](mlops.ipynb) build and have stored `input_data`.

In [None]:
# Import the latest sagemaker, stepfunctions and boto3 SDKs
import sys
!{sys.executable} -m pip install --upgrade pip
!{sys.executable} -m pip install -qU awscli boto3 "sagemaker>=2.1.0<3"
!{sys.executable} -m pip install -qU "stepfunctions==2.0.0"
!{sys.executable} -m pip show sagemaker stepfunctions

In [None]:
import boto3
import json
import os
import time
import uuid
from botocore.exceptions import ClientError

import sagemaker
from sagemaker.image_uris import retrieve 
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput
from sagemaker.model_monitor.dataset_format import DatasetFormat

import stepfunctions
from stepfunctions import steps
from stepfunctions.inputs import ExecutionInput
from stepfunctions.workflow import Workflow

Load the input data from the `mlops.ipynb` notebook and print values

In [None]:
%store -r input_data PROVISIONED_PRODUCT_NAME
input_data, PROVISIONED_PRODUCT_NAME

Load variables from environment

In [None]:
def get_config(provisioned_product_name):
 sc = boto3.client("servicecatalog")
 outputs = sc.get_provisioned_product_outputs(ProvisionedProductName=provisioned_product_name)[
 "Outputs"
 ]
 config = {}
 for out in outputs:
 config[out["OutputKey"]] = out["OutputValue"]
 return config


config = get_config(PROVISIONED_PRODUCT_NAME)
region = config["Region"]
model_name = config["ModelName"]
role = config["SageMakerRoleARN"]
workflow_role_arn = config["WorkflowRoleARN"]


# Define the lambda function names for steps
create_experiment_function_name = 'mlops-create-experiment'
query_training_function_name = 'mlops-query-training'
transform_header_function_name = 'mlops-add-transform-header'
query_drift_function_name = 'mlops-query-drift'

# Get the session and default bucket
session = sagemaker.session.Session()
bucket = session.default_bucket()

print('region: {}'.format(region))
print('bucket: {}'.format(bucket))
print('sagemaker role: {}'.format(role))
print('workflow role: {}'.format(workflow_role_arn))

Specify the training model and transform output base uri

In [None]:
output_data = {
 'ModelOutputUri': 's3://{}/{}/model'.format(bucket, model_name), 
}

## Define Training Resources

### Input Schema

Define the input schema for the step functions which can then be used as arguments to resources

In [None]:
execution_input = ExecutionInput(
 schema={
 "GitBranch": str,
 "GitCommitHash": str,
 "DataVersionId": str,
 "ExperimentName": str,
 "TrialName": str,
 "BaselineJobName": str,
 "BaselineOutputUri": str,
 "TrainingJobName": str,
 "ModelName": str
 }
)

### Define the model monitor baseline

Define the environment variables

In [None]:
dataset_format = DatasetFormat.csv()
env = {
 "dataset_format": json.dumps(dataset_format),
 "dataset_source": "/opt/ml/processing/input/baseline_dataset_input",
 "output_path": "/opt/ml/processing/output",
 "publish_cloudwatch_metrics": "Disabled", # Have to be disabled from processing job?
}

Define the processing inputs and outputs 

In [None]:
inputs = [
 ProcessingInput(
 source=input_data['BaselineUri'],
 destination="/opt/ml/processing/input/baseline_dataset_input",
 input_name="baseline_dataset_input",
 ),
]
outputs = [
 ProcessingOutput(
 source="/opt/ml/processing/output",
 destination=execution_input["BaselineOutputUri"],
 output_name="monitoring_output",
 ),
]

Create the baseline processing job using the sagemaker [model monitor](https://sagemaker.readthedocs.io/en/stable/amazon_sagemaker_model_monitoring.html) container.

In [None]:
# Get the default model monitor container
monor_monitor_container_uri = retrieve(region=region, framework="model-monitor", version="latest")

# Use the base processing where we pass through the 
monitor_analyzer = Processor(
 image_uri=monor_monitor_container_uri,
 role=role, 
 instance_count=1,
 instance_type="ml.m5.xlarge",
 max_runtime_in_seconds=1800,
 env=env
)

Test the model baseline processing job by running inline

In [None]:
# monitor_analyzer.run(inputs=inputs, outputs=outputs, wait=True)

### Defining the Training Job

Define the training job to run in paralell with the processing job

In [None]:
image_uri = sagemaker.image_uris.retrieve(region=region, framework="xgboost", version="latest")

# Create the estimator
xgb = sagemaker.estimator.Estimator(
 image_uri,
 role,
 instance_count=1,
 instance_type="ml.m4.xlarge",
 output_path=output_data['ModelOutputUri'], # NOTE: Can't use execution_input here
)

# Set the hyperparameters overriding with any defaults
hyperparameters = {
 "max_depth": "9",
 "eta": "0.2",
 "gamma": "4",
 "min_child_weight": "300",
 "subsample": "0.8",
 "objective": "reg:linear",
 "early_stopping_rounds": "10",
 "num_round": "50", # Don't stop to early or results are bad
}
xgb.set_hyperparameters(**hyperparameters)

# Specify the data source
s3_input_train = sagemaker.inputs.TrainingInput(s3_data=input_data['TrainingUri'], content_type="csv")
s3_input_val = sagemaker.inputs.TrainingInput(s3_data=input_data['ValidationUri'], content_type="csv")
data = {"train": s3_input_train, "validation": s3_input_val}

Test the estimator directly in the notebook

In [None]:
# xgb.fit(inputs=data)

## Define Training Workflow

### 1. Create the Experiment

Define the create experiment lambda.

In future add [ResultsPath](https://docs.aws.amazon.com/step-functions/latest/dg/input-output-resultpath.html) to filter the results.

In [None]:
create_experiment_step = steps.compute.LambdaStep(
 'Create Experiment',
 parameters={ 
 "FunctionName": create_experiment_function_name,
 'Payload': {
 "ExperimentName.$": '$.ExperimentName',
 "TrialName.$": '$.TrialName',
 }
 },
 result_path='$.CreateTrialResults'
)

### 2a. Run processing Job

Define the processing job with a specific failure handling

In [None]:
baseline_step = steps.sagemaker.ProcessingStep(
 "Baseline Job",
 processor=monitor_analyzer,
 job_name=execution_input["BaselineJobName"],
 inputs=inputs,
 outputs=outputs,
 experiment_config={
 'ExperimentName': execution_input["ExperimentName"], # '$.ExperimentName', 
 'TrialName': execution_input["TrialName"],
 'TrialComponentDisplayName': "Baseline",
 },
 tags={
 "GitBranch": execution_input["GitBranch"],
 "GitCommitHash": execution_input["GitCommitHash"],
 "DataVersionId": execution_input["DataVersionId"],
 },
 result_path='$.BaselineJobResults'
)

baseline_step.add_catch(steps.states.Catch(
 error_equals=["States.TaskFailed"],
 next_step=stepfunctions.steps.states.Fail(
 "Baseline failed", cause="SageMakerBaselineJobFailed"
 ),
))

### 2b. Run and query training Job

Define the training job and add a validation step

In [None]:
training_step = steps.TrainingStep(
 "Training Job",
 estimator=xgb,
 data=data,
 job_name=execution_input["TrainingJobName"],
 experiment_config={
 'ExperimentName': execution_input["ExperimentName"],
 'TrialName': execution_input["TrialName"],
 'TrialComponentDisplayName': "Training",
 },
 tags={
 "GitBranch": execution_input["GitBranch"],
 "GitCommitHash": execution_input["GitCommitHash"],
 "DataVersionId": execution_input["DataVersionId"],
 },
 result_path='$.TrainingResults'
)

training_step.add_catch(stepfunctions.steps.states.Catch(
 error_equals=["States.TaskFailed"],
 next_step=stepfunctions.steps.states.Fail(
 "Training failed", cause="SageMakerTrainingJobFailed"
 ),
))

Create a model from the training job, note this must follow training to retrieve the expected model

In [None]:
# Must follow the training test
model_step = steps.sagemaker.ModelStep(
 'Save Model',
 input_path='$.TrainingResults',
 model=training_step.get_expected_model(),
 model_name=execution_input['ModelName'],
 result_path='$.ModelStepResults'
)

Query training results, and validate that the RMSE error is within an acceptable range 

In [None]:
training_query_step = steps.compute.LambdaStep(
 'Query Training Results',
 parameters={ 
 "FunctionName": query_training_function_name,
 'Payload':{
 "TrainingJobName.$": '$.TrainingJobName'
 }
 },
 result_path='$.QueryTrainingResults'
)

check_accuracy_fail_step = steps.states.Fail(
 'Model Error Too Low',
 comment='RMSE accuracy higher than threshold'
)

check_accuracy_succeed_step = steps.states.Succeed('Model Error Acceptable')

# TODO: Update query method to query validation error using better result path
threshold_rule = steps.choice_rule.ChoiceRule.NumericLessThan(
 variable=training_query_step.output()['QueryTrainingResults']['Payload']['results']['TrainingMetrics'][0]['Value'], value=10
)

check_accuracy_step = steps.states.Choice(
 'RMSE < 10'
)

check_accuracy_step.add_choice(rule=threshold_rule, next_step=check_accuracy_succeed_step)
check_accuracy_step.default_choice(next_step=check_accuracy_fail_step)

### 3. Add the Error handling in the workflow

We will use the [Catch Block](https://aws-step-functions-data-science-sdk.readthedocs.io/en/stable/states.html#stepfunctions.steps.states.Catch) to perform error handling. If the Processing Job Step or Training Step fails, the flow will go into failure state.

In [None]:
sagemaker_jobs = steps.states.Parallel("SageMaker Jobs")
sagemaker_jobs.add_branch(baseline_step)
sagemaker_jobs.add_branch(steps.states.Chain([training_step, model_step, training_query_step, check_accuracy_step]))

# Do we need specific failure for the jobs for group?
sagemaker_jobs.add_catch(stepfunctions.steps.states.Catch(
 error_equals=["States.TaskFailed"],
 next_step=stepfunctions.steps.states.Fail(
 "SageMaker Jobs failed", cause="SageMakerJobsFailed"
 ),
))

## Execute Training Workflow

Create the training workflow.

In [None]:
training_workflow_definition = steps.states.Chain([
 create_experiment_step,
 sagemaker_jobs
])

training_workflow_name = 'mlops-{}-training'.format(model_name)
training_workflow = Workflow(training_workflow_name, training_workflow_definition, workflow_role_arn)
training_workflow.create()
training_workflow

We can also inspect the raw workflow definition and verify the execution variables are correctly passed in

In [None]:
print(training_workflow.definition.to_json(pretty=True))

 Now we define the inputs for the workflow

In [None]:
# Define some dummy job and git params
job_id = uuid.uuid1().hex
git_branch = 'main'
git_commit_hash = 'xxx' 
data_verison_id = 'yyy'

# Define the experiment and trial name based on model name and job id
experiment_name = "mlops-{}".format(model_name)
trial_name = "mlops-{}-{}".format(model_name, job_id)

workflow_inputs = {
 "ExperimentName": experiment_name,
 "TrialName": trial_name,
 "GitBranch": git_branch,
 "GitCommitHash": git_commit_hash, 
 "DataVersionId": data_verison_id, 
 "BaselineJobName": trial_name, 
 "BaselineOutputUri": f"s3://{bucket}/{model_name}/monitoring/baseline/mlops-{model_name}-pbl-{job_id}",
 "TrainingJobName": trial_name,
 "ModelName": trial_name,
}
print(json.dumps(workflow_inputs))

Then execute the workflow

In [None]:
execution = training_workflow.execute(
 inputs=workflow_inputs
)

Wait for the execution to complete, and output the last step.

In [None]:
execution_output = execution.get_output(wait=True)
execution.list_events()[-1]

Use [list_events](https://aws-step-functions-data-science-sdk.readthedocs.io/en/latest/workflow.html#stepfunctions.workflow.Execution.list_events) to list all events in the workflow execution.

In [None]:
# execution.list_events(html=True) # Bug

## Execute Batch Transform

Take the model we have trained and run a batch transform on the validation dataset.


In [None]:
execution_input = ExecutionInput(
 schema={
 "GitBranch": str,
 "GitCommitHash": str,
 "DataVersionId": str,
 "ExperimentName": str,
 "TrialName": str,
 "ModelName": str,
 "TransformJobName": str,
 "MonitorJobName": str,
 "MonitorOutputUri": str,
 }
)

Define some new output paths for the transform and monitoring jobs

In [None]:
output_data['TransformOutputUri'] = f"s3://{bucket}/{model_name}/transform/mlops-{model_name}-{job_id}"
output_data['MonitoringOutputUri'] = f"s3://{bucket}/{model_name}/monitoring/mlops-{model_name}-{job_id}"
output_data['BaselineOutputUri'] = workflow_inputs['BaselineOutputUri']

### 1. Run the Transform Job

Define a transform job to take the test dataset as input. 

We can configured the batch transform to [associate prediction results](https://aws.amazon.com/blogs/machine-learning/associating-prediction-results-with-input-data-using-amazon-sagemaker-batch-transform/) with the input based in the `input_filter` and `output_filter` arguments.

In [None]:
transform_step = steps.TransformStep(
 'Transform Input Dataset',
 transformer=xgb.transformer(
 instance_count=1,
 instance_type='ml.m5.large',
 assemble_with='Line', 
 accept = 'text/csv',
 output_path=output_data['TransformOutputUri'], # NOTE: Can't use execution_input here
 ),
 job_name=execution_input['TransformJobName'], # TEMP
 model_name=execution_input['ModelName'], 
 data=input_data['TestUri'],
 content_type='text/csv',
 split_type='Line',
 input_filter='$[1:]', # Skip the first target column output_amount
 join_source='Input',
 output_filter='$[1:]', # Output all inputs excluding output_amount, followed by the predicted_output_amount
 result_path='$.TransformJobResults'
)

### 2. Add the Transform Header

The batch transform output does not include the header, so add this back to be able to run baseline.

In [None]:
transform_file_name = 'test.csv'
header = 'duration_minutes,passenger_count,trip_distance,total_amount'

In [None]:
transform_header_step = steps.compute.LambdaStep(
 'Add Transform Header',
 parameters={ 
 "FunctionName": transform_header_function_name,
 'Payload': {
 "TransformOutputUri": output_data['TransformOutputUri'],
 "FileName": transform_file_name,
 "Header": header,
 }
 },
 result_path='$.TransformHeaderResults'
)

### 3. Run the Model Monitor Processing Job

Create a model monitor processing job that takes the output of the transform job.

Reference the `constraints.json` and `statistics.json` from the output form the training baseline.

In [None]:
dataset_format = DatasetFormat.csv()
env = {
 "dataset_format": json.dumps(dataset_format),
 "dataset_source": "/opt/ml/processing/input/baseline_dataset_input",
 "output_path": "/opt/ml/processing/output",
 "publish_cloudwatch_metrics": "Disabled", # Have to be disabled from processing job?
 "baseline_constraints": "/opt/ml/processing/baseline/constraints/constraints.json",
 "baseline_statistics": "/opt/ml/processing/baseline/stats/statistics.json"
}
inputs = [
 ProcessingInput(
 source=os.path.join(output_data['TransformOutputUri'], transform_file_name), # Transform with header
 destination="/opt/ml/processing/input/baseline_dataset_input",
 input_name="baseline_dataset_input",
 ),
 ProcessingInput(
 source=os.path.join(output_data['BaselineOutputUri'], 'constraints.json'),
 destination="/opt/ml/processing/baseline/constraints",
 input_name="constraints",
 ),
 ProcessingInput(
 source=os.path.join(output_data['BaselineOutputUri'], 'statistics.json'),
 destination="/opt/ml/processing/baseline/stats",
 input_name="baseline",
 ),
]
outputs = [
 ProcessingOutput(
 source="/opt/ml/processing/output",
 destination=output_data['MonitoringOutputUri'],
 output_name="monitoring_output",
 ),
]

# Get the default model monitor container
monor_monitor_container_uri = retrieve(region=region, framework="model-monitor", version="latest")

# Use the base processing where we pass through the 
monitor_analyzer = Processor(
 image_uri=monor_monitor_container_uri,
 role=role, 
 instance_count=1,
 instance_type="ml.m5.xlarge",
 max_runtime_in_seconds=1800,
 env=env
)

Test the monitor baseline

In [None]:
# monitor_analyzer.run(inputs=inputs, outputs=outputs, wait=True)

Add the monitor step

In [None]:
monitor_step = steps.sagemaker.ProcessingStep(
 "Monitor Job",
 processor=monitor_analyzer,
 job_name=execution_input["MonitorJobName"],
 inputs=inputs,
 outputs=outputs,
 experiment_config={
 'ExperimentName': execution_input["ExperimentName"],
 'TrialName': execution_input["TrialName"],
 'TrialComponentDisplayName': "Baseline",
 },
 tags={
 "GitBranch": execution_input["GitBranch"],
 "GitCommitHash": execution_input["GitCommitHash"],
 "DataVersionId": execution_input["DataVersionId"],
 },
 result_path='$.MonitorJobResults'
)

monitor_step.add_catch(stepfunctions.steps.states.Catch(
 error_equals=["States.TaskFailed"],
 next_step=stepfunctions.steps.states.Fail(
 "Monitor failed", cause="SageMakerMonitorJobFailed"
 ),
))

Add the lambda step to query for violations in the processing job.

In [None]:
monitor_query_step = steps.compute.LambdaStep(
 'Query Monitoring Results',
 parameters={ 
 "FunctionName": query_drift_function_name,
 'Payload':{
 "ProcessingJobName.$": '$.MonitorJobName'
 }
 },
 result_path='$.QueryMonitorResults'
)

check_violations_fail_step = steps.states.Fail(
 'Completed with Violations',
 comment='Processing job completed with violations'
)

check_violations_succeed_step = steps.states.Succeed('Completed')

# TODO: Check specific drift in violations
status_rule = steps.choice_rule.ChoiceRule.StringEquals(
 variable=monitor_query_step.output()['QueryMonitorResults']['Payload']['results']['ProcessingJobStatus'], value='Completed'
)

check_violations_step = steps.states.Choice(
 'Check Violations'
)

check_violations_step.add_choice(rule=status_rule, next_step=check_violations_succeed_step)
check_violations_step.default_choice(next_step=check_violations_fail_step)

Create the transform workflow definition

In [None]:
transform_workflow_definition = steps.states.Chain([
 transform_step,
 transform_header_step,
 monitor_step, 
 monitor_query_step, 
 check_violations_step
])

transform_workflow_name = 'mlops-{}-transform'.format(model_name)
transform_workflow = Workflow(transform_workflow_name, transform_workflow_definition, workflow_role_arn)
transform_workflow.create()
transform_workflow

Define the workflow inputs based on the previous training run

In [None]:
# Define unique names for the transform and monitor baseline jobs
transform_job_name = "mlops-{}-trn-{}".format(model_name, job_id)
monitor_job_name = "mlops-{}-mbl-{}".format(model_name, job_id)

workflow_inputs = {
 "ExperimentName": experiment_name,
 "TrialName": trial_name,
 "GitBranch": git_branch,
 "GitCommitHash": git_commit_hash, 
 "DataVersionId": data_verison_id, 
 "ModelName": trial_name,
 "TransformJobName": transform_job_name, 
 "MonitorJobName": monitor_job_name,
}
print(json.dumps(workflow_inputs))

Execute the workflow and render the progress. 

In [None]:
execution = transform_workflow.execute(
 inputs=workflow_inputs
)

Wait for the execution to finish and list the last event.

In [None]:
execution_output = execution.get_output(wait=True)
execution.list_events()[-1]

### Inspect Transform Results

Verify that we can load the transform output with header

In [None]:
from io import StringIO
import pandas as pd
from sagemaker.s3 import S3Downloader

# Get the output, and add header
transform_output_uri = os.path.join(output_data['TransformOutputUri'], transform_file_name)
transform_body = S3Downloader.read_file(transform_output_uri)
pred_df = pd.read_csv(StringIO(transform_body), sep=",")
pred_df.head()

### Query monitoring output

If this completed with violations, let's inspect the output to see why that is the case.

In [None]:
violiations_uri = os.path.join(output_data['MonitoringOutputUri'], 'constraint_violations.json')
violiations = json.loads(S3Downloader.read_file(violiations_uri))
violiations

## Cleanup

Delete the workflows that we created as part of this notebook

In [None]:
training_workflow.delete()
transform_workflow.delete()