# Deriving inference insights for your ML models and sending low confidence predictions to human review workflows using Amazon SageMaker Model Monitor and Amazon A2I

When a ML model is deployed in production, monitoring the model is important for maintaining the quality of predictions. While the statistical properties of the training data are known in advance, incoming, real-life data can gradually deviate over time and negatively impact predictive power of your model, a phenomenon known as data drift. Detecting these conditions in production can be challenging and time-consuming, requiring a system that captures incoming real-time data, performs statistical analyses, defines rules to detect drift, and sends alerts for rule violations. Furthermore, the process must be repeated for every new iteration of the model.

Amazon SageMaker Model Monitor enables you to efficiently and continuously monitor machine learning models in production. You can set alerts to detect deviations in the model quality and proactively take corrective actions, such as retraining models, auditing upstream systems, or fixing data quality issues. You can use insights from Amazon Model Monitor to choose ML inferences to send to humans for review using Amazon Augmented AI (Amazon A2I). Amazon A2I makes it easy to integrate a human review into your machine learning workflow. This allows you to automatically have humans step in and review data when a model is unable to make a high confidence prediction or to audit model predictions on an on-going basis.

In this post we show how to setup a ML workflow on Amazon SageMaker to train a XGBoost algorithm for Breast Cancer prediction. We will then deploy the model with a real-time endpoint, capture a fraction of the data sent to the endpoint, create a baseline from the training dataset, launch a model monitoring schedule, review baseline constraints and statistics, and trigger a human review loop for below threshold predictions. We will then show how the human loop workers review/update the predictions that can be used to update your original training dataset for model re-training.


## Contents

1. [Preprocess your input dataset](#Preprocess_your_input_dataset)
1. [Train and deploy a XGBoost Model](#Step_2_-_Train_and_deploy_a_XGBoost_Model)
1. [Generate baselines and start an Amazon SageMaker Model Monitor](#Step_3_-_Start_the_Amazon_SageMaker_Model_Monitor)
1. [Review the model monitor reports and derive insights](#Step_4_-_Review_the_model_monitor_reports_and_derive_insights)
1. [Setup Human Review loops using Amazon A2I](#Step_5_-_Setup_Human_Review_loops_using_Amazon_A2I)
1. [Cleaning up](#Clean_up)
1. [Conclusion](#Conclusion)

## Prerequisites

### Create your workforce

This step requires you to use the AWS Console. We will create a private workteam and add only one user (you) to it. To create a private team:

1. Go to AWS Console > Amazon SageMaker > Labeling workforces
1. Click "Private" and then "Create private team".
1. Enter the desired name for your private workteam.
1. Enter your own email address in the "Email addresses" section.
1. Enter the name of your organization and a contact email to administer the private workteam.
1. Click "Create Private Team".
1. The AWS Console should now return to AWS Console > Amazon SageMaker > Labeling workforces. Your newly created team should be visible under "Private teams". Next to it you will see an ARN which is a long string that looks like arn:aws:sagemaker:region-name-123456:workteam/private-crowd/team-name. **Please enter this ARN in the cell below**
1. You should get an email from no-reply@verificationemail.com that contains your workforce username and password.
1. In AWS Console > Amazon SageMaker > Labeling workforces, click on the URL in Labeling portal sign-in URL. Use the email/password combination from Step 8 to log in (you will be asked to create a new, non-default password).
1. This is your private worker's interface. When we create a verification task in Verify your task using a private team below, your task should appear in this window. You can invite your colleagues to participate in the labeling job by clicking the "Invite new workers" button.


### Setup Amazon SageMaker Studio Notebook

1. Onboard to Amazon SageMaker Studio using the quick start (https://docs.aws.amazon.com/sagemaker/latest/dg/onboard-quick-start.html). Please attach the [AmazonAugmentedAIFullAccess](https://console.aws.amazon.com/iam/home#/policies/arn%3Aaws%3Aiam%3A%3Aaws%3Apolicy%2FAmazonAugmentedAIFullAccess) permissions policy to the IAM role you create during Studio onboarding to run this notebook.
1. When user is created and is active, click Open Studio.
1. In the Studio landing page, choose File --> New --> Terminal.
1. In the terminal, enter the following code:
 * git clone https://github.com/aws-samples/amazon-a2i-sample-jupyter-notebooks
1. Open the notebook by choosing “Amazon-A2I-with-Amazon-SageMaker-Model-Monitor.ipynb†in the amazon-a2i-sample-jupyter-notebooks folder in the left pane of the Studio landing page.

## Step 1 - Preprocess your input dataset

Let's start by specifying the S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.

In [None]:
bucket = '<your s3 bucket name>'
prefix = '<enter a prefix for your notebook execution>'
WORKTEAM_ARN= "<enter the ARN of your private labeling workforce>"
 
# Define IAM role
import boto3
import re
import sagemaker
from sagemaker import get_execution_role


role = get_execution_role()
print("RoleArn: {}".format(role))
sess = sagemaker.Session() 

### Lets import some data science libraries and the Amazon SageMaker Python SDK

In [None]:
import numpy as np # For matrix operations and numerical processing
import pandas as pd # For munging tabular data
import matplotlib.pyplot as plt # For charts and visualizations
from IPython.display import Image # For displaying images in the notebook
from IPython.display import display # For displaying outputs in the notebook
from time import gmtime, strftime # For labeling SageMaker models, endpoints, etc.
import sys # For writing outputs to notebook
import math # For ceiling function
import json # For parsing hosting outputs
import os # For manipulating filepath names 
from sagemaker.predictor import csv_serializer # Converts strings for HTTP POST requests on inference

### Lets load our dataset

Before creating the template, we will load a tabular dataset, split the data into train and test, store the test data in Amazon S3, and train a machine learning model. The dataset we use is on Breast Cancer prediction and can be found [here](http://archive.ics.uci.edu/ml). 

Reference: [1] Dua, D. and Graff, C. (2019). UCI Machine Learning Repository ]. Irvine, CA: University of California, School of Information and Computer Science.

Based on the input features, we will first train a model to detect a benign or malignant label.

In [None]:
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

def generatedf(split_ratio):
 """Loads the dataset into a dataframe and generates train/test splits"""
 data = load_breast_cancer()
 df = pd.DataFrame(data.data, columns = data.feature_names)
 df['label'] = data.target
 cols = list(df.columns)
 cols = cols[-1:] + cols[:-1]
 df = df[cols]
 train, test = train_test_split(df, test_size=split_ratio, random_state=42)
 return train, test

train_data, test_data = generatedf(0.2)
train_data.head()

In [None]:
#create a separate dataset for Model Monitoring schedule
mm_data = test_data.drop(['label'],axis=1)

In [None]:
#store the datasets locally
train_data.to_csv('train.csv',index = None, header=None)
test_data.to_csv('test.csv', index = None, header=None)
mm_data.to_csv('mm.csv', index = None, header=None)

In [None]:
# load the data into S3
sess.upload_data('train.csv', bucket=bucket, key_prefix=os.path.join(prefix, 'train'))
sess.upload_data('test.csv', bucket=bucket, key_prefix=os.path.join(prefix, 'test'))

Because we're training with the CSV file format, we'll create s3_inputs that our training function can use as a pointer to the files in S3, which also specifies that the content type is CSV.

In [None]:
#load the train and test data filenames from Amazon S3
s3_input_train = sagemaker.s3_input(s3_data='s3://{}/{}/train'.format(bucket, prefix), content_type='csv')
s3_input_validation = sagemaker.s3_input(s3_data='s3://{}/{}/test/'.format(bucket, prefix), content_type='csv')

In [None]:
test_data.shape

Amazon SageMaker's XGBoost container expects data in the libSVM or CSV data format. For this example, we'll stick to CSV. Note that the first column must be the target variable and the CSV should not include headers. Also, notice that although repetitive it's easiest to do this after the train|validation|test split rather than before. This avoids any misalignment issues due to random reordering.

## Step 2 - Train and deploy a XGBoost model

The `XGBoost` (eXtreme Gradient Boosting) is a popular and efficient open-source implementation of the gradient boosted trees algorithm. Gradient boosting is a supervised learning algorithm that attempts to accurately predict a target variable by combining an ensemble of estimates from a set of simpler and weaker models. The XGBoost algorithm performs well in machine learning competitions because of its robust handling of a variety of data types, relationships, distributions, and the variety of hyperparameters that you can fine-tune. You can use XGBoost for regression, classification (binary and multiclass), and ranking problems.

You can use the new release of the XGBoost algorithm either as an Amazon SageMaker built-in algorithm or as a framework to run training scripts in your local environments. Using the built-in algorithm version of XGBoost is simpler than using the open source version, because you don’t have to write a training script. If you don’t need the features and flexibility of open source XGBoost, consider using the built-in version. For information about using the Amazon SageMaker XGBoost built-in algorithm, see [XGBoost Algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) in the Amazon SageMaker Developer Guide.

First we'll need to specify the ECR container location for Amazon SageMaker's implementation of XGBoost.

In [None]:
from sagemaker.amazon.amazon_estimator import get_image_uri
container = get_image_uri(boto3.Session().region_name, 'xgboost', '1.0-1')

### Create the XGBoost Estimator

Now that we have the XGBoost container, we use it to construct an estimator using the SageMaker Estimator API and initiate a training job. This XGBoost built-in algorithm runs directly on the input datasets. Amazon SageMaker XGBoost currently only trains using CPUs. It is a memory-bound (as opposed to compute-bound) algorithm. So, a general-purpose compute instance (for example, M5) is a better choice than a compute-optimized instance (for example, C4). Further, we recommend that you have enough total memory in selected instances to hold the training data. Although it supports the use of disk space to handle data that does not fit into main memory (the out-of-core feature available with the libsvm input mode), writing cache files onto disk slows the algorithm processing time.


In [None]:
sess = sagemaker.Session()

xgb = sagemaker.estimator.Estimator(container,
 role, 
 train_instance_count=1, 
 train_instance_type='ml.m5.2xlarge',
 output_path='s3://{}/{}/output'.format(bucket, prefix),
 sagemaker_session=sess)

### Specify Hyperparameters

Hyperparameters are set by users to facilitate the estimation of model parameters from data. 

- `max_depth` Controls how deep each tree within the algorithm can be built. Deeper trees can lead to better fit, but are more computationally expensive and can lead to overfitting. Typically, you need to explore some trade-offs in model performance between a large number of shallow trees and a smaller number of deeper trees.
- `subsample` Controls sampling of the training data. This hyperparameter can help reduce overfitting, but setting it too low can also starve the model of data.
- `num_round` Controls the number of boosting rounds. This value specifies the models that are subsequently trained using the residuals of previous iterations. Again, more rounds should produce a better fit on the training data, but can be computationally expensive or lead to overfitting.
- `eta Controls` how aggressive each round of boosting is. Larger values lead to more conservative boosting.
- `gamma` Controls how aggressively trees are grown. Larger values lead to more conservative models.
- `min_child_weight` Also controls how aggresively trees are grown. Large values lead to a more conservative model.

For other hyperparameters and to know more details please refer to [XGBoost Parameters](https://xgboost.readthedocs.io/en/release_0.90/parameter.html#parameters-for-tree-booster) 

In [None]:
xgb.set_hyperparameters(max_depth=5,
 eta=0.2,
 gamma=4,
 min_child_weight=6,
 subsample=0.8,
 silent=0,
 objective='binary:logistic',
 num_round=100)

xgb.fit({'train': s3_input_train, 'validation': s3_input_validation}) 

### Deploy the XGBoost Model

Now that we've trained the `XGBoost` algorithm on our data, let's deploy a model that's hosted behind a real-time endpoint. As a first step lets specify the paths to Amazon S3 locations for storing data, report, and processing code.

In [None]:
print(f"Bucket is: {bucket}")
print(f"Prefix is: {prefix}")
data_capture_prefix = '{}/datacapture'.format(prefix)
s3_capture_upload_path = 's3://{}/{}'.format(bucket, data_capture_prefix)
reports_prefix = '{}/reports'.format(prefix)
s3_report_path = 's3://{}/{}'.format(bucket,reports_prefix)
code_prefix = '{}/code'.format(prefix)

print("Capture path: {}".format(s3_capture_upload_path))
print("Report path: {}".format(s3_report_path))

We now specify the capture option called [DataCaptureConfig](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_DataCaptureConfig.html). You can capture the request payload, the response payload, or both with this configuration. The capture configuration applies to all variants.

In [None]:
from sagemaker.model_monitor import DataCaptureConfig

endpoint_name = 'xgb-breast-cancer-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
print("EndpointName={}".format(endpoint_name))

data_capture_config = DataCaptureConfig(
 enable_capture=True,
 sampling_percentage=100,
 destination_s3_uri=s3_capture_upload_path)

After you fit an XGBoost Estimator, you can host the newly created model in SageMaker. After you call fit, you can call deploy on an XGBoost estimator to create a SageMaker endpoint. The endpoint runs a SageMaker-provided XGBoost model server to host your model, which was run when you called fit. 

The deploy function returns a Predictor object, which you can use to do inference on the Endpoint hosting your XGBoost model. Each Predictor provides a predict method which can do inference with numpy arrays, Python lists, or strings. After inference arrays or lists are serialized and sent to the XGBoost model server, predict returns the result of inference against your model.

**Note:** We are specifying a ml.m5.2xlarge instance below for our endpoint. This will incur charges for the duration this endpoint is active. For more details please see [Amazon SageMaker Pricing](https://aws.amazon.com/sagemaker/pricing/)

In [None]:
xgb_predictor = xgb.deploy(initial_instance_count=1,
 instance_type='ml.m5.2xlarge',
 endpoint_name=endpoint_name,
 data_capture_config=data_capture_config)

In [None]:
xgb_predictor.content_type = 'text/csv'
xgb_predictor.serializer = csv_serializer

### Invoke the deployed model using the endpoint

You can now send data to this endpoint to get inferences in real time. Because you enabled the data capture in the previous steps, the request and response payload, along with some additional metadata, is saved in the Amazon S3 location that you specified in DataCaptureConfig.

In [None]:
# EXISTING-ENDPOINT
# Use this code to instantiate the predictor object if you've already created an earlier endpoint and its running. Provide the correct endpoint name below and uncomment both code lines below
# endpoint_name = "xgb-breast-cancer-2020-07-22-21-33-23"
#xgb_predictor_2 = sagemaker.predictor.RealTimePredictor(endpoint=endpoint_name,content_type='text/csv')

In [None]:
import time
def invoke_endpoint(predictor, data_df, rows=500):
 print("Sending test traffic to the endpoint {}. \nPlease wait...".format(endpoint_name))
 
 predictions = ''
 i = 0
 for row in data_df.to_numpy():
 payload = ",".join([str(num) for num in row])
 response = predictor.predict(payload)
 if i % 10 == 0:
 print(response)
 predictions = ','.join([predictions, response.decode('utf-8')])
 i = i + 1

 return np.fromstring(predictions[1:], sep=',')

In [None]:
# if this is not your first time running this notebook and you already have an endpoint running, 
# please execute the cell marked EXISTING-ENDPOINT above following the instructions provided in comments and
# replace the xgb_predictor variable name below accordingly
predictions = invoke_endpoint(xgb_predictor, test_data[list(test_data.columns)[1:]])

The Data Capture Config we specified earlier for the endpoint should have reported the input sent to the endpoint, the outout received from the endpoint as well as some metrics. Lets verify if the report was sent to S3.
**Note:** The upload of capture files might take a minute even if the endpoint invocation step above is complete. If you get an error when you execute the cell below give it a minute and then try again.

In [None]:
s3_client = boto3.Session().client('s3')
current_endpoint_capture_prefix = '{}/{}'.format(data_capture_prefix, endpoint_name)
capture_files = sess.list_s3_files(bucket, current_endpoint_capture_prefix)
while True:
 if capture_files:
 print("Found Capture Files:")
 print("\n ".join(capture_files))
 break

Lets now review the content of the S3 objects 

In [None]:
import json

def get_obj_body(obj_key):
 return s3_client.get_object(Bucket=bucket, Key=obj_key).get('Body').read().decode("utf-8")

capture_file = get_obj_body(capture_files[-1])

print(json.dumps(json.loads(capture_file.split('\n')[0]), indent=2))

### Understanding the prediction result

The objective we defined in the Hyperparameters for model training was `binary:logistic`. The model will apply logistic regression for binary classification, with output as probability. The probability refers to the log likelihood of the bernoulli distribution. For more details refer to [Bernoulli Distribution](https://en.wikipedia.org/wiki/Bernoulli_distribution). In our example above the value of 0.96 in the endpointOutput indicates a 96% probability for classification into a Label of 1 denoting a malignant result. 

## Step 3 - Start the Amazon SageMaker Model Monitor

Amazon SageMaker Model Monitor continuously monitors the quality of Amazon SageMaker machine learning models in production. It enables developers to set alerts for when there are deviations in the model quality, such as data drift. Early and pro-active detection of these deviations enables you to take corrective actions, such as retraining models, auditing upstream systems, or fixing data quality issues without having to monitor models manually or build additional tooling. 

### Create a Baseline

The baseline calculations of statistics and constraints are needed as a standard against which data drift and other data quality issues can be detected. Amazon SageMaker Model Monitor provides a built-in container that provides the ability to suggest the constraints automatically for CSV and flat JSON input.

The training dataset that you used to train the model is usually a good baseline dataset. The training dataset data schema and the inference dataset schema should exactly match (the number and order of the features). From the training dataset, you can ask Amazon SageMaker to suggest a set of baseline constraints and generate descriptive statistics to explore the data. For this example, upload the training dataset that was used to train the pretrained model included in this example. If you already have it in Amazon S3, you can point to it directly.

In [None]:
# copy over the training dataset to Amazon S3 (if you already have it in Amazon S3, you could reuse it)
baseline_prefix = prefix + '/baselining'
baseline_data_prefix = baseline_prefix + '/data'
baseline_results_prefix = baseline_prefix + '/results'

baseline_data_uri = 's3://{}/{}'.format(bucket,baseline_data_prefix)
baseline_results_uri = 's3://{}/{}'.format(bucket, baseline_results_prefix)
print('Baseline data uri: {}'.format(baseline_data_uri))
print('Baseline results uri: {}'.format(baseline_results_uri))

In [None]:
# Upload the training file for baselining
training_data_file = open("train.csv", 'rb')
s3_key = os.path.join(baseline_prefix, 'data', 'train.csv')
boto3.Session().resource('s3').Bucket(bucket).Object(s3_key).upload_fileobj(training_data_file)

In [None]:
# Start the baseline job
from sagemaker.model_monitor import DefaultModelMonitor
from sagemaker.model_monitor.dataset_format import DatasetFormat

my_default_monitor = DefaultModelMonitor(
 role=role,
 instance_count=1,
 instance_type='ml.m5.4xlarge',
 volume_size_in_gb=100,
 max_runtime_in_seconds=3600,
)

my_default_monitor.suggest_baseline(
 baseline_dataset=baseline_data_uri+'/train.csv',
 dataset_format=DatasetFormat.csv(header=False), 
 output_s3_uri=baseline_results_uri,
 wait=True,
 logs=False
)

### Inspect baseline job results

Now the baseline job has completed, lets inspect the results. Two files are generated:
- `statistics.json` This file is expected to have columnar statistics for each feature in the dataset that is analyzed. See the schema for this file in the [Schema for Statistics](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-statistics.html).
- `constraints.json` This file is expected to have the constraints on the features observed. See the schema for this file in the [Schema for Constraints](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html)

In [None]:
#s3_client = boto3.Session().client('s3')
#result = s3_client.list_objects(Bucket=bucket, Prefix=baseline_results_prefix)
result = sess.list_s3_files(bucket, baseline_results_prefix)
print("Found Files:")
print("\n ".join(result))

Lets inspect the contents of the statstics.json file for a couple of entries

In [None]:
# if header was set to False for the baselining creation function, the column names will look like "_c0," "_c1," etc.
# Let's print the statistics for a couple of rows
import pandas as pd

baseline_job = my_default_monitor.latest_baselining_job
schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict["features"])
schema_df.head(2)

Lets inspect the contents of the constraints.json file for a couple of entries. This should contain constraints applied for each of the columns. In our case we see that the non-negative constraint is applied

In [None]:
constraints_df = pd.json_normalize(baseline_job.suggested_constraints().body_dict["features"])
constraints_df.head(2)

In [None]:
# Lets check the monitoring configurations specified
constraints_mon_df = pd.json_normalize(baseline_job.suggested_constraints().body_dict["monitoring_config"])
constraints_mon_df

The Monitoring Configuration determines the Monitor's actions. For more details please refer to [Model Monitor documentation](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html)

- `emit_metrics` Amazon SageMaker emits Cloudwatch metrics for each feature/column observed in the dataset in the /aws/sagemaker/Endpoints/data-metric namespace with EndpointName and ScheduleName dimensions
- `datatype_check_threshold` During the baseline step, the generated constraints suggest the inferred data type for each column. The monitoring_config.datatype_check_threshold parameter can be tuned to adjust the threshold on when it is flagged as a violation
- `domain_content_threshold` If there are more unknown values for a String field in the current dataset than in the baseline dataset, this threshold can be used to dictate if it needs to be flagged as a violation
- `distribution_constraints.comparison_threshold` **This value is used to calculate model drift.** If the threshold is above the value set for the comparison_threshold, this causes a failure that is treated as a violation in the violation report. In our case, Model Monitor uses the comparison method of "Robust" based on the [two-sample K-S test](https://en.m.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test) to quantify the distance between the empirical distribution of our test dataset and the cumulative distribution of the baseline dataset.

For more details on baseline constraints please refer to [Schema for Constraints](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html)

### Create Monitoring Schedule

With a Monitoring Schedule, Amazon SageMaker can kick off processing jobs at a specified frequency to analyze the data collected during a given period. Amazon SageMaker provides a pre-built container for performing analysis on tabular datasets. In the processing job, Amazon SageMaker compares the dataset for the current analysis with the baseline statistics, constraints provided and generate a violations report. In addition, CloudWatch metrics are emitted for each feature under analysis. Lets create a monitoring schedule to run hourly

In [None]:
from sagemaker.model_monitor import CronExpressionGenerator
from time import gmtime, strftime

mon_schedule_name = 'xgb-breast-cancer-a2i-blog-monitor-schedule-' + strftime("%Y-%m-%d-%H-%M-%S", gmtime())
my_default_monitor.create_monitoring_schedule(
 monitor_schedule_name=mon_schedule_name,
 endpoint_input=xgb_predictor.endpoint,
 output_s3_uri=s3_report_path,
 statistics=my_default_monitor.baseline_statistics(),
 constraints=my_default_monitor.suggested_constraints(),
 schedule_cron_expression=CronExpressionGenerator.hourly(),
 enable_cloudwatch_metrics=True,

)

**Note:** Please ensure that the MaxRunTime for your Model Monitor is smaller than the CRON schedule you specify. Otherwise you will get an error - CreateMonitoringSchedule operation: stopping condition should be smaller than schedule cadence

Lets invoke the endpoint continuously to generate traffic for the model monitor to pickup

In [None]:
from threading import Thread
from time import sleep
import time

endpoint_name=xgb_predictor.endpoint
runtime_client = boto3.client('runtime.sagemaker')

def invoke_endpoint(ep_name, file_name, runtime_client):
 with open(file_name, 'r') as f:
 for row in f:
 payload = row.rstrip('\n')
 response = runtime_client.invoke_endpoint(EndpointName=ep_name,
 ContentType='text/csv', 
 Body=payload)
 response['Body'].read()
 time.sleep(1)
 
def invoke_endpoint_forever():
 while True:
 invoke_endpoint(endpoint_name, 'mm.csv', runtime_client)
 
thread = Thread(target = invoke_endpoint_forever)
thread.start()

# Note that you need to stop the kernel to stop the invocations

In [None]:
# Check the Monitor Schedule status
desc_schedule_result = my_default_monitor.describe_schedule()
print('Schedule status: {}'.format(desc_schedule_result['MonitoringScheduleStatus']))

## Step 4 - Review Model Monitoring Execution Output

In [None]:
# Check execution status every 5 minutes
mon_executions = my_default_monitor.list_executions()
print("We created a hourly schedule above and it will kick off executions ON the hour (plus 0 - 20 min buffer.\nWe will check execution status every 5 minutes...")

while len(mon_executions) == 0:
 print("Waiting for Model Monitor to pick up execution results..." + strftime("%Y-%m-%d-%H-%M-%S", gmtime()))
 time.sleep(300)
 mon_executions = my_default_monitor.list_executions() 

Lets look at the latest execution status and print the report name

In [None]:
latest_execution = mon_executions[-1] # latest execution's index is -1, second to last is -2 and so on..
latest_execution.wait(logs=False)

print("Latest execution status: {}".format(latest_execution.describe()['ProcessingJobStatus']))
print("Latest execution result: {}".format(latest_execution.describe()['ExitMessage']))

latest_job = latest_execution.describe()
if (latest_job['ProcessingJobStatus'] != 'Completed'):
 print("====STOP==== \n No completed executions to inspect further. Please wait till an execution completes or investigate previously reported failures.")

In [None]:
report_uri=latest_execution.output.destination
print('Report Uri: {}'.format(report_uri))

### Review Violations Report

The violations file is generated as the output of a MonitoringExecution, which lists the results of evaluating the constraints (specified in the constraints.json file) against the current dataset that was analyzed. The Amazon SageMaker Model Monitor pre-built container provides the following violation checks:

- `data_type_check` 
- `completeness_check`
- `baseline_drift_check`
- `missing_column_check`
- `extra_column_check`
- `categorical_values_check`

For more details about the violation checks please refer to the Model Monitor documentation [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-interpreting-violations.html)

In [None]:
violations = my_default_monitor.latest_monitoring_constraint_violations()
constraints_df = pd.json_normalize(violations.body_dict["violations"])
constraints_df.head(10)

As can be seen above, the model monitor detected a data_type_check violation in one of the requests sent to the endpoint. Data Drift or Model Drift occurs if a **baseline_drift_check** violation is triggered. So we do not see a model drift with our endpoint. To enable proactive action on these metrics, please check the documentation [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-interpreting-cloudwatch.html) for how to emit these metrics to Amazon Cloudwatch. You can also visualize the results of monitoring in Amazon SageMaker Studio. For information about the onboarding process for using Studio, see [Onboard to Amazon SageMaker Studio](https://docs.aws.amazon.com/sagemaker/latest/dg/gs-studio-onboard.html). You can view monitoring results at your endpoints using charts. You can view the jobs being monitored and deep dive into each of the jobs. 

In [None]:
# View distribution of output probabilities
from matplotlib import pyplot as plt
plt.xlim([-0.1, 1.1])
bin_size=0.05
bins = np.arange(-0.1, 1.1, bin_size) # fixed bin size

plt.hist(predictions, bins=bins, alpha=0.5)
plt.title('Distribution of probabilities')
plt.xlabel(f'probabilities (bin size = {bin_size})')
plt.ylabel('count')

plt.show()

### Evaluate results

There are two perspectives to be considered here to determine next steps for our experiment. 

- **`Model Monitor Violations`** We only saw the datatype_check violation from the Model Monitor. We did not see a model drift violation. In our case, Model Monitor uses the comparison method of "Robust" based on the [two-sample K-S test](https://en.m.wikipedia.org/wiki/Kolmogorov%E2%80%93Smirnov_test) to quantify the distance between the empirical distribution of our test dataset and the cumulative distribution of the baseline dataset. This distance did not exceed the value set for the “comparison_thresholdâ€. The prediction results are aligned with the results in the training dataset. For more details refer to [Model Monitor Interpret Results](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-interpreting-violations.html)

- **`Probability Distribution of Prediction results`** We used a test dataset of 114 requests. Out of this, we see that the model predicts 60% of the requests to be malignant (>90% probability output in the prediction results), 30% benign (< 10% probability output in the prediction results) and the remaining 10% of the requests are indeterminate as shown in the chart above.

As a next step, you need to send the prediction results that are distributed with output probabilities of > 10% and < 90% (because the model is unable to predict with sufficient confidence) to a domain expert who can look at the model results and identify if the tumor is benign or malignant. You use Amazon A2I to setup a Human Review workflow and define conditions for activating the review loop.

## Step 5 - Set up a human review loop for low-confidence detection using Amazon A2I

Amazon Augmented AI (Amazon A2I) makes it easy to build the workflows required for human review of ML predictions. Amazon A2I brings human review to all developers, removing the undifferentiated heavy lifting associated with building human review systems or managing large numbers of human reviewers.

To incorporate Amazon A2I into your human review workflows you need:

A worker task template to create a worker UI. The worker UI displays your input data, such as documents or images, and instructions to workers. It also provides interactive tools that the worker uses to complete your tasks. For more information, see [A2I instructions overview](https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-worker-template-console.html)

A human review workflow, also referred to as a flow definition. You use the flow definition to configure your human workforce and provide information about how to accomplish the human review task. To learn more see [create flow definition](https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html)

When using a custom task type, you start a human loop using the Amazon Augmented AI Runtime API. When you call StartHumanLoop in your custom application, a task is sent to human reviewers.

#### In this section, you set up a human review loop for low-confidence detections in Amazon A2I. It includes the following steps:

* Create or choose your workforce
* Create a human task UI
* Create the flow definition
* Trigger conditions for human loop activation
* Check the human loop status and wait for reviewers to complete the task

Lets now initialize some variables that we need in the subsequent steps

In [None]:
import io
import uuid
import time

timestamp = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
# Amazon SageMaker client
sagemaker_client = boto3.client('sagemaker')

# Amazon Augment AI (A2I) client
a2i = boto3.client('sagemaker-a2i-runtime')

# Amazon S3 client 
s3 = boto3.client('s3')

# Flow definition name - this value is unique per account and region. You can also provide your own value here.
flowDefinitionName = 'fd-xgb-breast-cancer-' + timestamp

# Task UI name - this value is unique per account and region. You can also provide your own value here.
taskUIName = 'ui-xgb-breast-cancer-' + timestamp

# Flow definition outputs
OUTPUT_PATH = f's3://{bucket}/{prefix}/a2i-results'

### Create the human task UI

Create a human task UI resource, giving a UI template in liquid html. This template will be rendered to the human workers whenever human loop is required. For over 70 pre built UIs, check: https://github.com/aws-samples/amazon-a2i-sample-task-uis

In [None]:
template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<style>
 table, tr, th, td {
 border: 1px solid black;
 border-collapse: collapse;
 padding: 5px;
 }
</style>

<crowd-form>
 <div>
 <h1>Instructions</h1>
 <p>Please review the predictions in the Predictions table based on the input data table below, and make corrections where appropriate. </p>
 <p> Here are the labels: </p>
 <p> 0: Benign </p>
 <p> 1: Malignant </p>
 </div>
 <div>
 <h3> Breast cancer dataset </h3>
 <div id="my_table"> {{ task.input.table | skip_autoescape }} </div>
 </div>
 <br>
 <h1> Predictions Table </h1>
 <table>
 <tr>
 <th>ROW NUMBER</th>
 <th>MODEL PREDICTION</th>
 <th>AGREE/DISAGREE WITH ML RATING?</th>
 <th>YOUR PREDICTION</th>
 <th>CHANGE REASON </th>
 </tr>

 {% for pair in task.input.Pairs %}

 <tr>
 <td>{{ pair.row }}</td>
 <td><crowd-text-area name="predicted{{ forloop.index }}" value="{{ pair.prediction }}"></crowd-text-area></td>
 <td>
 <p>
 <input type="radio" id="agree{{ forloop.index }}" name="rating{{ forloop.index }}" value="agree" required>
 <label for="agree{{ forloop.index }}">Agree</label>
 </p>
 <p>
 <input type="radio" id="disagree{{ forloop.index }}" name="rating{{ forloop.index }}" value="disagree" required>
 <label for="disagree{{ forloop.index }}">Disagree</label> 
 </p> 
 </td>
 <td>
 <p>
 <input type="text" name="True Prediction" placeholder="Enter your Prediction" />
 </p>
 </td>
 <td>
 <p>
 <input type="text" name="Change Reason" placeholder="Explain why you changed the prediction" />
 </p>
 </td>
 </tr>

 {% endfor %}

 </table>
</crowd-form>
"""

def create_task_ui():
 '''
 Creates a Human Task UI resource.

 Returns:
 struct: HumanTaskUiArn
 '''
 response = sagemaker_client.create_human_task_ui(
 HumanTaskUiName=taskUIName,
 UiTemplate={'Content': template})
 return response

In [None]:
# Create task UI
humanTaskUiResponse = create_task_ui()
humanTaskUiArn = humanTaskUiResponse['HumanTaskUiArn']
print(humanTaskUiArn)

### Create the Flow Definition
In this section, we're going to create a flow definition definition. Flow Definitions allow us to specify:
- The workforce that your tasks will be sent to. 
- The instructions that your workforce will receive. This is called a worker task template. 
- Where your output data will be stored.

This demo is going to use the API, but you can optionally create this workflow definition in the console as well. For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.

In [None]:
create_workflow_definition_response = sagemaker_client.create_flow_definition(
 FlowDefinitionName= flowDefinitionName,
 RoleArn= role,
 HumanLoopConfig= {
 "WorkteamArn": WORKTEAM_ARN,
 "HumanTaskUiArn": humanTaskUiArn,
 "TaskCount": 1,
 "TaskDescription": "Review the model predictions and determine if you agree or disagree. Assign a label of 1 to indicate malignant result or 0 to indicate a benign result based on your review of the inference request",
 "TaskTitle": "Using Model Monitor and A2I Demo"
 },
 OutputConfig={
 "S3OutputPath" : OUTPUT_PATH
 }
 )
flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] # let's save this ARN for future use

In [None]:
# Describe flow definition - status should be active
for x in range(60):
 describeFlowDefinitionResponse = sagemaker_client.describe_flow_definition(FlowDefinitionName=flowDefinitionName)
 print(describeFlowDefinitionResponse['FlowDefinitionStatus'])
 if (describeFlowDefinitionResponse['FlowDefinitionStatus'] == 'Active'):
 print("Flow Definition is active")
 break
 time.sleep(2)

### Set trigger conditions for human loop activation

As we discussed before, we see from the probability distribution of predicted results that a prediction output probability range of > 30% and < 60% when inferred consistently may lead to model drift and needs to be investigated. So we setup the trigger condition for the Amazon A2I human loop to be within this range.
**Note:** Please ignore the dataframe warning displayed

In [None]:
# assign our original test dataset 
model_data_categorical = test_data[list(test_data.columns)[1:]] 

LOWER_THRESHOLD = 0.1
UPPER_THRESHOLD = 0.9
small_payload_df = model_data_categorical.head(len(predictions))
small_payload_df['prediction_prob'] = predictions
small_payload_df_res = small_payload_df.loc[
 (small_payload_df['prediction_prob'] > LOWER_THRESHOLD) &
 (small_payload_df['prediction_prob'] < UPPER_THRESHOLD)
]
print(small_payload_df_res.shape)
small_payload_df_res.head(10)

In [None]:
print(f"{len(small_payload_df)} out of {len(predictions)} samples or " +
 '{:.1%} of the payload was sent to review.'.format(len(small_payload_df)/len(predictions)))

In [None]:
# Note that the prediction is in terms of a probability from 0 to 1 for a discrete label of 1 indicating malignant condition
low_conf_predictions = small_payload_df_res['prediction_prob'].to_list()
NUM_TO_REVIEW = len(low_conf_predictions) # You can change this number as desired
item_list = [{'row': "ROW_{}".format(x), 'prediction': low_conf_predictions[x]} for x in range(NUM_TO_REVIEW)]
item_list

In [None]:
ip_content = {"table": small_payload_df_res.reset_index().drop(columns = ['index']).head(NUM_TO_REVIEW).to_html(), 
 'Pairs': item_list
 }

In [None]:
# Activate human loops
import json
humanLoopName = str(uuid.uuid4())

start_loop_response = a2i.start_human_loop(
 HumanLoopName=humanLoopName,
 FlowDefinitionArn=flowDefinitionArn,
 HumanLoopInput={
 "InputContent": json.dumps(ip_content)
 }
 )

### Check status of task completion and human loop

Let's define a function that allows us to check the status of Human Loop progress

In [None]:
completed_human_loops = []
resp = a2i.describe_human_loop(HumanLoopName=humanLoopName)
print(f'HumanLoop Name: {humanLoopName}')
print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
print('\n')
 
if resp["HumanLoopStatus"] == "Completed":
 completed_human_loops.append(resp)

Wait for workers to complete their tasks

In [None]:
workteamName = WORKTEAM_ARN[WORKTEAM_ARN.rfind('/') + 1:]
print("Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!")
print('https://' + sagemaker_client.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])

Check status of human loop again

In [None]:
completed_human_loops = []
resp = a2i.describe_human_loop(HumanLoopName=humanLoopName)
print(f'HumanLoop Name: {humanLoopName}')
print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
print('\n')
 
if resp["HumanLoopStatus"] == "Completed":
 completed_human_loops.append(resp)

Let's inspect the results of the human review tasks

In [None]:
import re
import pprint

pp = pprint.PrettyPrinter(indent=4)

for resp in completed_human_loops:
 splitted_string = re.split('s3://' + bucket + '/', resp['HumanLoopOutput']['OutputS3Uri'])
 output_bucket_key = splitted_string[1]

 response = s3.get_object(Bucket=bucket, Key=output_bucket_key)
 content = response["Body"].read()
 json_output = json.loads(content)
 pp.pprint(json_output)
 print('\n')

### Clean up

If you are done with this notebook, please run the cells below. This will remove your monitoring schedule, and the hosted endpoint you created. Also please make sure to stop this notebook instance when you are done to avoid incurring charges

In [None]:
# List the monitoring schedule
!aws sagemaker list-monitoring-schedules

In [None]:
# Copy the MonitoringScheduleName from above to provide in the delete command below
!aws sagemaker delete-monitoring-schedule --monitoring-schedule-name 'xgb-breast-cancer-a2i-blog-monitor-schedule-2020-08-19-17-59-05'

In [None]:
#Now delete the endpoint. If you get an error try again in a couple of minutes
sagemaker.Session().delete_endpoint(xgb_predictor.endpoint)

## Conclusion

This notebook demonstrated how you can use Amazon SageMaker Model Monitor and Amazon A2I to setup a monitoring schedule for your Amazon SageMaker model endpoints, specify baselines that include constraint thresholds, observe inference traffic, derive insights such as model drift, completeness, data type violations and send the low confidence predictions to a Human Workflow with labelers to review and update the results. The human labeled output can be used to augment the training dataset for re-training, keeping the distribution variance within threshold, preventing data drift and improving model accuracy.