# Bring your own custom drift detector with Amazon SageMaker Model Monitor

This notebook shows how to:

* Host a machine learning model in Amazon SageMaker and capture inference requests, results, and metadata
* Build a Docker container to include your custom drift algorithms
* Monitor a live endpoint for detecting drifts
* Visualize the drift results

## Background

Amazon SageMaker provides every developer and data scientist with the ability to build, train, and deploy machine learning models quickly. Amazon SageMaker is a fully-managed service that encompasses the entire machine learning workflow. You can label and prepare your data, choose an algorithm, train a model, and then tune and optimize it for deployment. You can deploy your models to production with Amazon SageMaker to make predictions and lower costs than was previously possible.

In addition, Amazon SageMaker enables you to capture the input, output and metadata for invocations of the models that you deploy. It also enables you to bring your own metrics to analyze the data and monitor its quality. In this notebook, you learn how Amazon SageMaker enables these capabilities.

## Setup

To get started, make sure you have these prerequisites completed.

* Specify an AWS Region to host your model.
* An IAM role ARN exists that is used to give Amazon SageMaker access to your data in Amazon Simple Storage Service (Amazon S3). See the documentation for how to fine tune the permissions needed.
* Create an S3 bucket used to store the data used to train your model, any additional model data, and the data captured from model invocations. For demonstration purposes, you are using the same bucket for these. In reality, you might want to separate them with different security policies.

In [None]:
# Handful of configuration

import os
import boto3
import json
from sagemaker import get_execution_role, session

region= boto3.Session().region_name

sm_client = boto3.client('sagemaker')

role = get_execution_role()
print("RoleArn: {}".format(role))

# You can use a different bucket, but make sure the role you chose for this notebook
# has the s3:PutObject permissions. This is the bucket into which the data is captured
bucket =  session.Session(boto3.Session()).default_bucket()
print("Demo Bucket: {}".format(bucket))
prefix = 'sagemaker/DEMO-ModelMonitor'

s3_capture_upload_path = f's3://{bucket}/{prefix}/datacapture'
s3_report_path = f's3://{bucket}/{prefix}/reports'

print(f"Capture path: {s3_capture_upload_path}")
print(f"Report path: {s3_report_path}")

### Upload train and test datasets, and model file to S3

The dataset is taken from [UCI Census Income Data Set](https://archive.ics.uci.edu/ml/datasets/adult). The task is to predict whether income exceeds $50K/yr based on census data. We have split the dataset into train and test datasets. The model was trained using XGBoost and the model file is provided here. 

We need test datasets for calculating projected accuracy.

In [None]:
model_file = open("model/model.tar.gz", 'rb')
train_file = open("data/train.csv", 'rb')
test_file = open("data/test.csv", 'rb')

s3_model_key = os.path.join(prefix, 'model.tar.gz')
s3_train_key = os.path.join(prefix, 'train.csv')
s3_test_key = os.path.join(prefix, 'test.csv')

boto3.Session().resource('s3').Bucket(bucket).Object(s3_model_key).upload_fileobj(model_file)
boto3.Session().resource('s3').Bucket(bucket).Object(s3_train_key).upload_fileobj(train_file)
boto3.Session().resource('s3').Bucket(bucket).Object(s3_test_key).upload_fileobj(test_file)

print("Success! You are all set to proceed.")

## Bring your own custom model drift detection algorithm

In order to bring your own custom model drift detection algorithm, you need to do following things:
* Create custom detection algorithms. We have included algorithms under src folder
* Create a Docker container.
* Set enviornmental variables where the container can find the datacapture data from SageMaker Model Monitor. These variables have to match with the values we provide to monitor scheduler later.

In [None]:
!pygmentize Dockerfile

### Build the container and upload it to ECR

In [None]:
from docker_utils import build_and_push_docker_image

repository_short_name = 'custom-model-monitor'

image_name = build_and_push_docker_image(repository_short_name)

## Setup endoint and enable data capture

The data that is sent for inference to the endpoint needs to pre-processed before the XGBoost model can do prediction. Below code shows custom input handler for inference endpoint.

In [None]:
!pygmentize script/inference.py

### Setting up model endpoint can take few minutes.

In [None]:
from sagemaker.xgboost.model import XGBoostModel
from sagemaker.serializers import CSVSerializer
from sagemaker.model_monitor import DataCaptureConfig

model_url = f's3://{bucket}/{s3_model_key}'

xgb_inference_model = XGBoostModel(
    model_data=model_url,
    role=role,
    entry_point='inference.py',
    source_dir='script',
    framework_version='1.2-1',
)

data_capture_config = DataCaptureConfig(
                        enable_capture=True,
                        sampling_percentage=100,
                        destination_s3_uri=s3_capture_upload_path)

predictor = xgb_inference_model.deploy(
    initial_instance_count=1,
    instance_type="ml.c5.xlarge",
    serializer=CSVSerializer(),
    data_capture_config=data_capture_config)

## Create monitoring schedule to detect drifts on hourly basis

Default Model monitor can be setup to monitor the inference on an hourly basis against the baseline metrics and violations. In this example, we are setting custom model monitor. For this purpose, we are using Boto3 calls directly to setup model monitor with the container we built above. Note that we need to setup input and output paths on the container. 

In [None]:
s3_train_path = f's3://{bucket}/{s3_train_key}'
s3_test_path = f's3://{bucket}/{s3_test_key}'
s3_result_path = f's3://{bucket}/{prefix}/result/{predictor.endpoint_name}'

sm_client.create_monitoring_schedule(
    MonitoringScheduleName=predictor.endpoint_name,
    MonitoringScheduleConfig={
        'ScheduleConfig': {
            'ScheduleExpression': 'cron(0 * ? * * *)'
        },
        'MonitoringJobDefinition': {
            'MonitoringInputs': [
                {
                    'EndpointInput': {
                        'EndpointName': predictor.endpoint_name,
                        'LocalPath': '/opt/ml/processing/endpointdata'
                    }
                },
            ],
            'MonitoringOutputConfig': {
                'MonitoringOutputs': [
                    {
                        'S3Output': {
                            'S3Uri': s3_result_path,
                            'LocalPath': '/opt/ml/processing/resultdata',
                            'S3UploadMode': 'EndOfJob'
                        }
                    },
                ]
            },
            'MonitoringResources': {
                'ClusterConfig': {
                    'InstanceCount': 1,
                    'InstanceType': 'ml.c5.xlarge',
                    'VolumeSizeInGB': 10
                }
            },
            'MonitoringAppSpecification': {
                'ImageUri': image_name,
                'ContainerArguments': [
                    '--train_s3_uri',
                    s3_train_path,
                    '--test_s3_uri',
                    s3_test_path,
                    '--target_label',
                    'income'
                ]
            },
            'StoppingCondition': {
                'MaxRuntimeInSeconds': 600
            },
            'Environment': {
                'string': 'string'
            },
            'RoleArn': role
        }
    }
)

## Start sending pre-configured traffic to endpoint

The cell below starts a thread to send some pre-configured traffic at a constant rate to the endpoint. The data points have been pre-conditioned to have drift, so that we can visualize it later. The traffic is sent for about 10 hours. If you like to stop the traffic, you need to stop the kernel to terminate this thread. 

In [None]:
from threading import Thread
from time import time, sleep

def invoke_endpoint(ep_name, file_name, runtime_client):
    pre_time = time()
    with open(file_name) as f:
        count = len(f.read().split('\n')) - 2 # Remove EOF and header
    
    # Calculate time needed to sleep between inference calls if we need to have a constant rate of calls for 10 hours
    ten_hours_in_sec = 10*60*60
    sleep_time = ten_hours_in_sec/count
    
    with open(file_name, 'r') as f:
        next(f) # Skip header
        
        for ind, row in enumerate(f):   
            start_time = time()
            payload = row.rstrip('\n')
            response = runtime_client(data=payload)
            
            # Print every 15 minutes (900 seconds)
            if (ind+1) % int(count/ten_hours_in_sec*900) == 0:
                print(f'Finished sending {ind+1} records.')
            
            # Sleep to ensure constant rate. Time spent for inference is subtracted
            sleep(max(sleep_time - (time() - start_time), 0))
                
    print("Done!")
    
print(f"Sending test traffic to the endpoint {predictor.endpoint_name}. \nPlease wait...")

thread = Thread(target = invoke_endpoint, args=(predictor.endpoint, 'data/infer.csv', predictor.predict))
thread.start()

## Visualization

Here we provide several visualizations to capture model drift. The plots are launched in threads so that they can refesh plots automatically every hour.

In [None]:
%load_ext autoreload

%autoreload 1

import sys
from threading import Timer

sys.path.append('src')

%aimport drift_visualizer
%aimport utils

### Projected accuracy

In [None]:
def plot_accuracy():
    df = utils.construct_df_from_result(s3_result_path)
    if df is not None:    
        drift_visualizer.plot_accuracy(df)
    Timer(3600, plot_accuracy)
    
plot_accuracy()

### Normalized feature drift scores

In [None]:
def plot_drift_score():
    df = utils.construct_df_from_result(s3_result_path) 
    if df is not None:    
        drift_visualizer.plot_drift_score(df)
    Timer(3600, plot_drift_score)
    
plot_drift_score()

### Null hypothesis of features (based on p-values)

In [None]:
def plot_p_values():
    df = utils.construct_df_from_result(s3_result_path)   
    if df is not None:            
        drift_visualizer.plot_p_values(df)
    Timer(3600, plot_p_values)
    
plot_p_values()

## Clean up resources
* Monitor schedule - needs to deleted before deleting endpoint
* Delete endpoint
* Delete model

In [None]:
sm_client.delete_monitoring_schedule(MonitoringScheduleName=predictor.endpoint)

# predictor.delete_endpoint()
# predictor.delete_model()