# Amazon SageMaker Model Monitor - Design a Compelling Record Filtering Method Using Custom Preprocessing Script

<a id='overview-0'> </a>
## [Overview](./00-Overview.ipynb)
* **[Amazon SageMaker Model Monitor- Design a Compelling Record Filtering Method Using Custom Preprocessing Script](./Data_Quality_Custom_Preprocess_Churn.ipynb)**
  * **[Business Problem](#business-problem)**
  * **[Setup](#nb0-setup)**
  * **[Deploy Pre-Trained XGBoost Model with Script-Mode](#nb0-deploy)**
  * **[Upload Required Files for SageMaker Model Monitor to S3 Location](#nb0-upload-to-s3)**
  * **[Create SageMaker Model Monitoring Schedule (Data Quality only)](#nb0-create-model-monitor)**
  * **[Test Scenarios](#nb0-test-scenarios)**
  * **[Clean-Up](#nb0-clean-up)**

<a id ='business-problem'> </a>
### Business Problem
[overview](#overview-0)

----
Continuous model monitoring and monitor strategy for model retraining and updating are an important step in operationalizing ML. Monitoring can provide information on how the model is performing in production, and the outputs of monitoring can be used to identify the problems proactively and take corrective actions to help stabilization of the model in production. However, in a real-world production settings, multiple personas may interact with the model including real users, engineers who are trouble-shooting production issues, or even bots conducting performance tests. In such a scenario, additional mechanisms may be required to ensure model monitoring works as expected in conjunction with production testing. We will demonstrate how to build a record filtering method based on sets of business criteria as part of preprocessing step in [Amazon SageMaker Model Monitoring](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html). The goal is to ensure only the target records are sent to downstream analysis steps to avoid false positive detection of violations.

<a id ='nb0-setup'> </a>
### Setup
[overview](#overview-0)

----

#### Import Libraries

In [None]:
import time
from datetime import datetime
import boto3
import random
import numpy as np
import os
import json
import pandas as pd
import pprint
import sagemaker
from sagemaker import get_execution_role
from sagemaker.xgboost.model import XGBoostModel
from sagemaker.model_monitor import DataCaptureConfig
from sagemaker.s3 import S3Uploader, S3Downloader

from src.demo_data_quality_model_monitor import DemoDataQualityModelMonitor
from src.monitoringjob_utils import run_model_monitor_job_processor
from src.artificial_traffic import ArtificialTraffic

pd.options.display.max_colwidth = None

#### Environment setup

In [None]:
sess = sagemaker.Session()
bucket = sess.default_bucket()
role = get_execution_role()
boto_session = boto3.Session()
region = boto_session.region_name

sm = boto_session.client(
    service_name = "sagemaker",
    region_name = region
)
s3_client = boto_session.client("s3")

project_name = 'DEMO_xgb_churn_prediction_monitor_with_record_filter' #change as needed
prefix = f"sagemaker/{project_name}" #change as needed
ep_prefix = 'DEMO-xgb-churn-pred-ep'
data_capture_prefix = f"{prefix}/datacapture"
s3_capture_upload_path = f"s3://{bucket}/{data_capture_prefix}"
tags = [{
    'Key': 'project',
    'Value': 'demo_xgboost_churn_prediction'
}]
print(f"project name: {project_name}")
print(f"project bucket name: {bucket}")
print(f"project S3 prefix: {prefix}")
print(f"tags: {tags}")
print(f"SageMaker DEMO Real-Time Inference Endpoint prefix: {ep_prefix}")
print(f"SageMaker Model Monitor Data Capture S3 Location: {s3_capture_upload_path}")

<a id ='nb0-deploy'> </a>
### Deploy Pre-Trained XGBoost Model with Script-Mode
[overview](#overview-0)

----

#### Upload a model artifact in a local directory to S3 location

In [None]:
#set this flag to True for the first time you run this notebook or when you want to replace the model
is_upload_model = True

model_path = 'model'
model_filename = 'model.tar.gz'
model_upload_uri = f's3://{bucket}/{prefix}/{model_path}'
local_model_path = f"./model/{model_filename}"
print(f"model s3 location: {model_upload_uri} \n")

if is_upload_model:
    S3Uploader.upload(
        local_path=local_model_path,
        desired_s3_uri=model_upload_uri
    )
else: print("skip")

In [None]:
from IPython.core.display import display, HTML

display(
    HTML(
        '<b>Verify <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}?region={}&prefix={}/">S3 Location </a> After the S3 Copy Has Been Completed</b>'.format(
            bucket, region, f'{prefix}/{model_path}'
        )
    )
)

### Check Existing Demo Endpoint and Associated Monitor Schedule 
[Search API](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_Search.html) and Filter by Name and Tags are particularly useful for this purpose

In [None]:
max_results = 10

search_params={
   "MaxResults": max_results,
   "Resource": "Endpoint",
   "SearchExpression": { 
      "Filters": [
          { 
            "Name": f"Tags.{tags[0].get('Key')}",
            "Operator": "Equals",
            "Value": tags[0].get('Value')
          },
          { 
            "Name": "EndpointName",
            "Operator": "Contains",
            "Value": ep_prefix
          },
          { 
            "Name": "MonitoringSchedules.MonitoringScheduleName",
            "Operator": "Contains",
            "Value": ep_prefix
          }
      ]},
    "SortBy": "CreationTime",
    "SortOrder": "Descending"
}

results = sm.search(**search_params)

In [None]:
all_demo_schedule = []
all_demo_eps = []

for result in results.get('Results', []):
    endpoint = result.get('Endpoint')
    if endpoint:
        all_demo_eps.append(endpoint['EndpointName'])
        mon_schedules = endpoint.get('MonitoringSchedules', [])
        for schedule in mon_schedules:
            all_demo_schedule.append(schedule['MonitoringScheduleName'])

print(f"Found existing demo schedules: {all_demo_schedule} ") if all_demo_schedule else print(f"No existing demo schedule containing the prefix,{ep_prefix}, found ")
print(f"Found existing demo endpoints associated with monitor schedule: {all_demo_eps} ") if all_demo_eps else print(f"No existing endpoint associated with monitor schedule containing the prefix,{ep_prefix}, found ")

#### Remove Existing DEMO SageMaker SageMaker Model Monitor Schedule (by tag)

In [None]:
#set this flag to False for the first time you run this notebook. Set it to True when you want to delete the demo monitor schedule
is_rmv_demo_monitor = False

if is_rmv_demo_monitor and all_demo_schedule:
    print("Deleting schedules", end="", flush=True)
    for name in all_demo_schedule:
        sm.delete_monitoring_schedule(MonitoringScheduleName=name)
        print(".", end="", flush=True)
        time.sleep(1)
else: print("skip")

#### Remove Existing DEMO SageMaker Real-Time Inference Endpoints (by tag)

In [None]:
#set this flag to False for the first time you run this notebook. Set it to True when you want to delete the demo endpoints
is_rmv_demo_eps = False

if is_rmv_demo_eps and all_demo_eps:
    print("Deleting endpoints", end="", flush=True)
    for ep in all_demo_eps:
        sm.delete_endpoint(EndpointName=ep)
        print(".", end="", flush=True)
        time.sleep(1)
else: print("skip")

#### Deploy the Model to SageMaker Real-Time Inference Endpoint or Grab the Existing One

In [None]:
#!pygmentize ./src/inference.py

In [None]:
# specify a existing demo inference endpoint name or leave it as a balnk 
current_endpoint_name = ''

# Set this to True if you want to create a new SageMaker Inference Endpoint. Default to True if no demo endpoints w/ monitor schedule found or they have been deleted and endpoint not specified
is_create_new_ep = (not(all_demo_eps) or is_rmv_demo_eps) and not(current_endpoint_name)
print(f"Create a new endpoint?: {is_create_new_ep}")

if is_create_new_ep:
    ## Configure the Data Capture
    data_capture_config = DataCaptureConfig(
        enable_capture=True, 
        sampling_percentage=100, 
        destination_s3_uri=s3_capture_upload_path
    )
    current_endpoint_name = f'{ep_prefix}-{datetime.now():%Y-%m-%d-%H-%M}'
    print(f"Create a Endpoint: {current_endpoint_name}")

    xgb_inference_model = XGBoostModel(
        model_data=f'{model_upload_uri}/{model_filename}',
        role=role,
        entry_point="./src/inference.py",
        framework_version="1.2-1")
    
    predictor = xgb_inference_model.deploy(
        initial_instance_count=1,
        instance_type="ml.m5.2xlarge",
        endpoint_name=current_endpoint_name,
        data_capture_config=data_capture_config,
        tags = tags,
        wait=True)
elif not(current_endpoint_name):
    current_endpoint_name = all_demo_eps[0]
    print(f"Use existing endpoint: {current_endpoint_name}")  
else: print(f"Use selected endpoint: {current_endpoint_name}")

This may take a while...please wait until the endpoint creation is complete

<a id ='nb0-upload-to-s3'> </a>
### Upload Required Files for SageMaker Model Monitor to S3 Location
[overview](#overview-0)

----

#### Upload the Validation Data including header and label for Sagemaker Model Monitor's Baselining Job 
SageMaker will suggest a set of constraints as baseline or reference, and generate a set of summary statistics that describe these constraints.
The schemas of baseline dataset and the inference dataset should match including the number of features as well as the order of features.
We will use the validation dataset that we used to validate the model as a suitable baseline dataset.

In [None]:
is_upload_validation_data = True
validation_filename = 'validation-dataset-with-header.csv'
local_validation_data_path = f"data/{validation_filename}"
s3_validation_data_uri = f's3://{bucket}/{prefix}/baselining'

if is_upload_validation_data:
    S3Uploader.upload(
        local_path=local_validation_data_path,
        desired_s3_uri=s3_validation_data_uri
    )
else: print("skip")

In [None]:
from IPython.core.display import display, HTML

display(
    HTML(
        '<b>Verify <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}?region={}&prefix={}/">S3 Upload </a> After the S3 Copy Has Been Completed</b>'.format(
            bucket, region, f'{prefix}/baselining'
        )
    )
)

#### Upload the Custom Preprocessing Script to to the S3 Location

In [None]:
#!pygmentize ./src/preprocessor.py

In [None]:
is_upload_preprocess_script = True

preprocessor_filename = 'preprocessor.py'
local_path_preprocessor = f"src/{preprocessor_filename}"
s3_record_preprocessor_uri = f's3://{bucket}/{prefix}/code'

if is_upload_preprocess_script:
    S3Uploader.upload(
        local_path=local_path_preprocessor,
        desired_s3_uri=s3_record_preprocessor_uri
    )
else: print("skip")

In [None]:
from IPython.core.display import display, HTML

display(
    HTML(
        '<b>Verify <a target="blank" href="https://s3.console.aws.amazon.com/s3/buckets/{}?region={}&prefix={}/">S3 Upload </a> After the S3 Copy Has Been Completed</b>'.format(
            bucket, region, f'{prefix}/code'
        )
    )
)

<a id ='nb0-create-model-monitor'> </a>
### Create SageMaker Model Monitoring Schedule (Data Quality only)
[overview](#overview-0)

----
We will create baseline constraints and statistics and model monitoring schedule for the Endpoint in one go using the custom utility tool.
Under the hood, [DefaultModelMonitor class](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html) is used to kick off SageMaker Processing Job with a SageMaker-provided Model Monitor Docker container with Apache Spark and the AWS Deequ open source library to generate the constraints and statistics as a baseline. After the baselining job completes, a monitoring schedule will be created given the parameters you specified below.

In [None]:
#!pygmentize ./src/demo_data_quality_model_monitor.py

In [None]:
demo_mon = DemoDataQualityModelMonitor(
    endpoint_name=current_endpoint_name, 
    bucket=bucket,
    projectfolder_prefix=prefix,
    training_dataset_path=f'{s3_validation_data_uri}/{validation_filename}',
    record_preprocessor_script=f'{s3_record_preprocessor_uri}/{preprocessor_filename}',
    post_analytics_processor_script=None,
    kms_key=None,
    subnets=None,
    security_group_ids=None,
    role=role,
    tags=tags)

#### Create a Data Quality Monitor Schedule

In [None]:
my_monitor = demo_mon.create_data_quality_monitor()

This may take a while..

#### Check Outputs of Baseline Suggestion

In [None]:
s3_data_quality_prefix = f'{prefix}/data_quality'
s3_data_quality_baseline_prefix = f'{s3_data_quality_prefix}/baselining'

# Get a list of S3 URIs
report_files = S3Downloader.list(f"s3://{bucket}/{s3_data_quality_baseline_prefix}")
pd.DataFrame(json.loads(S3Downloader.read_file(report_files[0]))["features"])

for filename in report_files:
    if str(filename).__contains__('statistics.json'):
        s3_statistics_uri = filename
        schema_df = pd.json_normalize(json.loads(S3Downloader.read_file(s3_statistics_uri))["features"])
    elif str(filename).__contains__('constraints.json'):
        s3_constraints_uri = filename
        constraints_df = pd.json_normalize(json.loads(S3Downloader.read_file(s3_constraints_uri))["features"])

In [None]:
schema_df.head()

In [None]:
constraints_df.head()

<a id ='nb0-test-scenarios'> </a>
### Test Scenarios
[overview](#overview-0)

----
We will test a few scenarios to verify if filtering based on custom attributes is working

### First Scenario: 
   1. Send a record that we know won't trigger any violations. To do this, you can use a method, `generate_artifical_traffic` and set `config` variable to empty list. Also set the `testIndicator` in custom attributes to `false` to indicate that itâ€™s not a test record. 
   2. Send another record that would actually trigger a violation. This time, we pass a set of dictionaries in `config` variable create bogus input features as shown below, and also set `testIndicator` to `true` to skip this record for the analysis. 
   3. Manually kick off a monitor job using `run_model_monitor_job_processor` method from the imported utility class and provide parameters such as s3 locations for baseline files, data capture, preprocessor script, and other info.
   4. In outputs of Monitor, confirm that `constraint_violations.json` shows `violations: [] 0 items` and `dataset: item_count:` in `statistics.json` shows `1`, instead of `2`. 
   5. This would confirm that Model Monitor has analyzed only the non-test record.

In [None]:
#!pygmentize ./src/artificial_traffic.py

In [None]:
artificial_traffic = ArtificialTraffic(
    endpointName = current_endpoint_name
)
print(f'EndpointName: {artificial_traffic.endpointName}')
print(f'transaction_id: {artificial_traffic.transactionId}')

In [None]:
payload = json.load(open(f'./data/sample.json','r'))
sample_config = json.load(open(f'./data/config.json','r'))
sample_config

In [None]:
payload=json.load(open(f'./data/sample.json','r'))

# normal payload -it should not cause any violations
artificial_traffic.generate_artificial_traffic(
    applicationName = "DEMO", 
    testIndicator = "false",
    payload=payload, 
    size=1,
    config=[]
)

## this would cause violations but testIndicaor is set to true so analysis will be skipped and hence no violations
artificial_traffic.generate_artificial_traffic(
    applicationName="DEMO", 
    testIndicator="true",
    payload=payload, 
    size=1,
    config=sample_config['config']
)
print(f"Current Transaction Id: {artificial_traffic.transactionId}")

#### View Data Capture file in S3
It may take a minute for data capture files to be populated in S3

In [None]:
current_endpoint_capture_prefix = f"{data_capture_prefix}/{current_endpoint_name}"
capture_files_scenario_1  = S3Downloader.list(f"s3://{bucket}/{current_endpoint_capture_prefix}")

while len(capture_files_scenario_1) == 0:
    capture_files_scenario_1  = S3Downloader.list(f"s3://{bucket}/{current_endpoint_capture_prefix}")
    if len(capture_files_scenario_1) == 0:
        time.sleep(10)

data_capture_path_scenario_1 = capture_files_scenario_1[len(capture_files_scenario_1) - 1][: capture_files_scenario_1[len(capture_files_scenario_1) - 1].rfind('/')]
print(f"\n data capture path: {data_capture_path_scenario_1}")

#### Trigger a Manual Model Monitoring Job 
SageMaker Model Monitor uses [Processing Job](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) under the hood so we can manually trigger a Monitoring job for testing. Fortunately, there is a utility tool which is available from this [repository](https://github.com/aws-samples/reinvent2019-aim362-sagemaker-debugger-model-monitor/tree/master/02_deploy_and_monitor) that already implements it for us. We will import this utility tool to trigger a manual Model Monitor job.

In [None]:
#!pygmentize ./src/monitoringjob_utils.py

In [None]:
s3_reports_path = f's3://{bucket}/{prefix}/reports'

print(f"S3 Location for statistics.json: {s3_statistics_uri}")
print(f"S3 Location for constraints.json: {s3_constraints_uri}")
print(f"S3 Location for report outputs: {s3_reports_path}")

In [None]:
run_model_monitor_job_processor(
    region,
    'ml.m5.xlarge',
    role,
    data_capture_path_scenario_1,
    s3_statistics_uri,
    s3_constraints_uri,
    s3_reports_path+'/scenario_1',
    preprocessor_path=f'{s3_record_preprocessor_uri}/{preprocessor_filename}'
)

#### Check the Manual Monitor Outputs

In [None]:
manual_monitor_job = sm.list_processing_jobs(
    NameContains = 'sagemaker-model-monitor-analyzer',
    SortOrder='Descending',
    MaxResults=2
)['ProcessingJobSummaries'][0]['ProcessingJobName']

manual_monitoring_job_info = sm.describe_processing_job(
    ProcessingJobName=manual_monitor_job
)

manual_monitoring_job_output = manual_monitoring_job_info['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']

print(manual_monitoring_job_output)

In [None]:
!aws s3 ls $manual_monitoring_job_output/

In [None]:
pd.read_json(f'{manual_monitoring_job_output}/constraint_violations.json')

Confirm above that there is no violations detected

In [None]:
!aws s3 cp $manual_monitoring_job_output/statistics.json - | head

Confirm that "item_count" is 1 not 2

### Second Scenario:        
   1. Send N records that we know that would trigger violations such as `data_type_check` and `baseline_drift_check`. set the `testIndicator` in custom attributes to `false`. 
   2. In Monitor outputs, confirm that `constraint_violations.json` shows `violations: [] 2 items` and `dataset: item_count:` in `statistics.json` shows `1001`. An extra item is a carry over from the first scenario testing so this is expected.  
   3. This would confirm that sending test records as inference records would trigger false positive violations if `testIndicator` is not set correctly.

In [None]:
artificial_traffic.generate_artificial_traffic(
    applicationName="DEMO", 
    testIndicator="false",
    payload=payload, 
    size=1000,
    config=sample_config['config']
)
print(f"Current Transaction Id: {artificial_traffic.transactionId}")

In [None]:
result_scenario_2 = s3_client.list_objects(Bucket=bucket, Prefix=current_endpoint_capture_prefix)
capture_files_scenario_2  = ['s3://{0}/{1}'.format(bucket, capture_file.get("Key")) for capture_file in result_scenario_2.get('Contents')]

print("Capture Files: ")
print("\n ".join(capture_files_scenario_2))

data_capture_path_scenario_2 = capture_files_scenario_2[len(capture_files_scenario_2) - 1][: capture_files_scenario_2[len(capture_files_scenario_2) - 1].rfind('/')]
print(f"\n data capture path: {data_capture_path_scenario_2}")

In [None]:
run_model_monitor_job_processor(region, 
                                'ml.m5.xlarge', 
                                role, 
                                data_capture_path_scenario_2, 
                                s3_statistics_uri, 
                                s3_constraints_uri, 
                                s3_reports_path+'/scenario_2',
                                preprocessor_path=f'{s3_record_preprocessor_uri}/{preprocessor_filename}')

In [None]:
manual_monitor_job = sm.list_processing_jobs(
    NameContains = 'sagemaker-model-monitor-analyzer',
    SortOrder='Descending',
    MaxResults=2
)['ProcessingJobSummaries'][0]['ProcessingJobName']

manual_monitoring_job_info = sm.describe_processing_job(
    ProcessingJobName=manual_monitor_job
)

manual_monitoring_job_output = manual_monitoring_job_info['ProcessingOutputConfig']['Outputs'][0]['S3Output']['S3Uri']

print(manual_monitoring_job_output)

In [None]:
!aws s3 ls $manual_monitoring_job_output/

In [None]:
pd.read_json(f'{manual_monitoring_job_output}/constraint_violations.json')

Confirm that there are violations detected. 

In [None]:
!aws s3 cp $manual_monitoring_job_output/statistics.json - | head

<a id ='nb0-clean-up'> </a>
### Clean-Up
[overview](#overview-0)

----
We can delete model monitoring schedule and endpoint we created earlier. You can wait to run the following code until the scheduled monitor has been kicked off if you are interested. You should expect to see a similar results we reviewed from a monitor job that we kicked off manually.

In [None]:
my_monitor.delete_monitoring_schedule()

In [None]:
sm.delete_endpoint(EndpointName=current_endpoint_name)

#### Release Resources

In [None]:
%%html

<p><b>Shutting down your kernel for this notebook to release resources.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:shutdown" style="display:none;">Shutdown Kernel</button>
        
<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {}    
</script>