# Amazon SageMaker Model Monitoring 

ML Monitoring is a critical MLOps capability to reduce risk and manage safe and reliable production machine learning systems at scale. SageMaker contains several integrated services such as [SageMaker Model Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html) and [SageMaker Clarify](https://aws.amazon.com/sagemaker/clarify/) to monitor models for data and model quality, bias, and feature attribution drift.

Amazon SageMaker Model Monitor continuously monitors the quality of Amazon SageMaker machine learning models in production. With Model Monitor, you can set alerts that notify you when there are deviations in the model quality. Early and proactive detection of these deviations enables you to take corrective actions, such as retraining models, auditing upstream systems, or fixing quality issues without having to monitor models manually or build additional tooling. Model Monitor integrates with SageMaker Clarify to provide pre-built and extendable monitors to get start with monitoring your ML models faster.

In this lab, you will learn how to:
 * Capture inference requests, results, and metadata from our pipeline deployed model.
 * Schedule a model monitor to monitor model performance on a regular schedule.

While each monitor requires task-specific configurations, the standardized monitoring setup workflow you will follow is:

1. Initialize a monitoring object
2. Configure and run a baseline job to contrastively compare results
3. Schedule continuous monitoring

The goal of this lab is that you walk through the code and understand how get started with monitoring your machine learning models with SageMaker Model Monitor. 

## Setup

In [None]:
!pip install "sagemaker>=2.123.0"

In [None]:
from datetime import datetime, timedelta
import pandas as pd
import time
import csv
import json
import boto3
import sagemaker

region = boto3.Session().region_name
sagemaker_session = sagemaker.session.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()

sagemaker_client = sagemaker_session.sagemaker_client
sagemaker_runtime_client = sagemaker_session.sagemaker_runtime_client

from sagemaker.predictor import Predictor
from sagemaker.serializers import CSVSerializer

from sagemaker.clarify import (
 BiasConfig,
 DataConfig,
 ModelConfig,
 ModelPredictedLabelConfig,
 SHAPConfig,
)

from sagemaker.model_monitor import (
 BiasAnalysisConfig,
 CronExpressionGenerator,
 DataCaptureConfig,
 EndpointInput,
 ExplainabilityAnalysisConfig,
 ModelBiasMonitor,
 ModelExplainabilityMonitor,
 DefaultModelMonitor,
 ModelQualityMonitor,
)

from sagemaker.model_monitor.dataset_format import DatasetFormat

from sagemaker.s3 import S3Downloader, S3Uploader

In [None]:
print(f"AWS region: {region}")
# A different bucket can be used, but make sure the role for this notebook has
# the s3:PutObject permissions. This is the bucket into which the data is captured.
print(f"S3 Bucket: {default_bucket}")

# Endpoint metadata.
# Note: you will use the staging endpoint from the previously lab just as you would in a real scenario to verify your monitoring
# setup before deploying your setup on production endpoints.
endpoint_name = "workshop-project-staging"
endpoint_instance_count = 1
endpoint_instance_type = "ml.m5.large"
print(f"Endpoint: {endpoint_name}")

prefix = "sagemaker/xgboost-dm-model-monitoring"
s3_key = f"s3://{default_bucket}/{prefix}"
print(f"S3 key: {s3_key}")

s3_capture_upload_path = f"{s3_key}/data_capture"
s3_ground_truth_upload_path = f"{s3_key}/ground_truth_data/{datetime.now():%Y-%m-%d-%H-%M-%S}"
s3_baseline_results_path = f"{s3_key}/baselines"
s3_report_path = f"{s3_key}/reports"

print(f"Capture path: {s3_capture_upload_path}")
print(f"Ground truth path: {s3_ground_truth_upload_path}")
print(f"Baselines path: {s3_baseline_results_path}")
print(f"Report path: {s3_report_path}")

sm_client = boto3.client('sagemaker')

endpoint_config = sm_client.describe_endpoint(EndpointName = endpoint_name)['EndpointConfigName']
model_name = sm_client.describe_endpoint_config(EndpointConfigName = endpoint_config)['ProductionVariants'][0]['ModelName']

print("Model Name : ", model_name)

## Configure data capture and generate synthetic traffic

Data quality monitoring automatically monitors machine learning (ML) models in production and notifies you when data quality issues arise. ML models in production have to make predictions on real-life data that is not carefully curated like most training datasets. If the statistical nature of the data that your model receives while in production drifts away from the nature of the baseline data it was trained on, the model begins to lose accuracy in its predictions. Amazon SageMaker Model Monitor uses rules to detect data drift and alerts you when it happens.

### Initialize SageMaker Predictor for real-time requests to previously deployed model endpoint

In [None]:
# Create a Predictor Python object for real-time endpoint requests. https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html
predictor = Predictor(endpoint_name=endpoint_name, serializer=CSVSerializer())

**If you have previously run `sagemaker-data-quality-monitoring.ipynb` you can ignore the next 2 cells**

In [None]:
# # SageMaker automatically created a DataCaptureConfig when your model was deployed to an endpoint 
# # in a prior lab that already had data capture enabled. Below is illustrating how create a custom 
# # DataCaptureConfig with data capture enabled and update an existing endpoint.
# data_capture_config = DataCaptureConfig(
# enable_capture=True,
# sampling_percentage=100,
# destination_s3_uri=s3_capture_upload_path,
# )

In [None]:
# # Now update endpoint with data capture enabled and provide an s3_capture_upload_path.
# predictor.update_data_capture_config(data_capture_config)

Note: updating your endpoint data config can take 3-5 min. A progress bar will be displayed in the cell above and indicates completion with `---------------!` and the cell execution number. You will see your endpoint status as `Updating` under SageMaker resources > Endpoints while this is in progress and `InService` when your updated endpoint is ready for requests.

### Invoke the deployed model endpoint to generate predictions

Now send data to this endpoint to get inferences in real time. 

With data capture enabled in the previous step, the request and response payload, along with some additional metadata, is saved to the S3 location specified in `DataCaptureConfig`.

In [None]:
# Read in training set for schema and to compute feature attribution baselines.
train_df = pd.read_csv("train-headers.csv")

In [None]:
# Use test set to create a file without headers and labels to mirror data format at inference time.
test_df = pd.read_csv("test.csv", header = None)
test_df.drop(test_df.columns[0], axis=1, inplace=True)
test_df.sample(180).to_csv('test-samples-no-header.csv', header = False, index = None)

Now send a test batch of 180 requests to the model endpoint. These inputs will be captured along with endpoint output predictions and sent to your `s3_capture_upload_path`.

In [None]:
print("Sending test traffic to the endpoint {}. \nPlease wait...".format(endpoint_name))

test_sample_df = pd.read_csv('test-samples-no-header.csv', header = None, index_col = False)

response = predictor.predict(data=test_sample_df.to_numpy())

print("Done!")

### View captured data

List the data capture files stored in Amazon S3. 

There should be different files from different time periods organized in S3 based on the hour in which the invocation occurred in the format: 

`s3://{destination-bucket-prefix}/{endpoint-name}/{AllTraffic or model-variant-name}/yyyy/mm/dd/hh/filename.jsonl`

In [None]:
print("Waiting 60 seconds for captures to show up", end="")

for _ in range(60):
 capture_files = sorted(S3Downloader.list(f"{s3_capture_upload_path}/{endpoint_name}"))
 if capture_files:
 break
 print(".", end="", flush=True)
 time.sleep(1)

print("\nFound Capture Files:")
print("\n ".join(capture_files[-10:]))

Next, view the content of a single capture file, looking at the first few lines in the captured file.

In [None]:
capture_file = S3Downloader.read_file(capture_files[-1]).split("\n")[-10:-1]
print(capture_file[-1])

View a single line is present below in a formatted JSON file.

In [None]:
print(json.dumps(json.loads(capture_file[-1]), indent=2))

### Generate synthetic traffic

In order to review SageMaker's continuous monitoring capabilities, you will start a thread to generate synthetic traffic to send to the deployed model endpoint. 

The `WorkerThread` class will run continuously on the notebook kernel to generate predictions that are captured and sent to S3 until the kernel is restarted or the thread is explicitly terminated. 

See the cell in the `Cleanup` section to terminate the threads.

This step is necessary because if there is no traffic, the monitoring jobs are marked as `Failed` since there is no data to process.

This cell extends a Python Thread class to be able to able to terminate the thread later on without terminating the notebook kernel.

In [None]:
import threading

class WorkerThread(threading.Thread):
 def __init__(self, do_run, *args, **kwargs):
 super(WorkerThread, self).__init__(*args, **kwargs)
 self.__do_run = do_run
 self.__terminate_event = threading.Event()

 def terminate(self):
 self.__terminate_event.set()

 def run(self):
 while not self.__terminate_event.is_set():
 self.__do_run(self.__terminate_event)

Now you define a function that your thread will invoke continuously to send test samples to the model endpoint.

In [None]:
def invoke_endpoint(terminate_event):
 with open("test-samples-no-header.csv", "r") as f:
 i = 0
 for row in f:
 payload = row.rstrip("\n")
 response = sagemaker_runtime_client.invoke_endpoint(
 EndpointName=endpoint_name,
 ContentType="text/csv",
 Body=payload,
 InferenceId=str(i), # unique ID per row
 )
 i += 1
 response["Body"].read()
 time.sleep(1)
 if terminate_event.is_set():
 break


# Keep invoking the endpoint with test data
invoke_endpoint_thread = WorkerThread(do_run=invoke_endpoint)
invoke_endpoint_thread.start()

### Generate synthetic ground truth data

Besides data capture, model monitoring execution also requires ground truth data.

In real use cases, ground truth data should be regularly collected and uploaded to designated S3 location. 

The code block below is used to generate fake ground truth data. The first-party merge container will combine captured and ground truth data, and the merged data will be passed to the model bias monitoring job for analysis. Similar to data capture, the model bias monitoring execution will fail if there's no data to merge.

In [None]:
import random

def ground_truth_with_id(inference_id):
 # set random seed to get consistent results.
 random.seed(inference_id) 
 rand = random.random()
 # format required by the merge container.
 return {
 "groundTruthData": {
 # randomly generate positive labels 70% of the time.
 "data": "1" if rand < 0.7 else "0",
 "encoding": "CSV",
 },
 "eventMetadata": {
 "eventId": str(inference_id),
 },
 "eventVersion": "0",
 }


def upload_ground_truth(upload_time):
 # 180 are the number of rows in data we're sending for inference.
 records = [ground_truth_with_id(i) for i in range(180)]
 fake_records = [json.dumps(r) for r in records]
 data_to_upload = "\n".join(fake_records)
 target_s3_uri = f"{s3_ground_truth_upload_path}/{upload_time:%Y/%m/%d/%H/%M%S}.jsonl"
 print(f"Uploading {len(fake_records)} records to", target_s3_uri)
 S3Uploader.upload_string_as_file_body(data_to_upload, target_s3_uri)

In [None]:
# Generate data for the last hour.
upload_ground_truth(datetime.utcnow() - timedelta(hours=1))

In [None]:
# You can also use the WorkerThread class to continue generating synthetic ground truth data once an hour.
def generate_fake_ground_truth(terminate_event):
 upload_ground_truth(datetime.utcnow())
 for _ in range(0, 60):
 time.sleep(60)
 if terminate_event.is_set():
 break


ground_truth_thread = WorkerThread(do_run=generate_fake_ground_truth)
ground_truth_thread.start()

## Monitor model quality

Model quality monitoring jobs monitor the performance of a model by comparing the predictions that the model makes with the actual ground truth labels that the model attempts to predict. To do this, model quality monitoring merges data that is captured from real-time inference with actual labels stored in S3, and then compares the predictions with the actual labels.

### Define `ModelQualityMonitor`

First, define and configure a [`ModelQualityMonitor`](https://sagemaker.readthedocs.io/en/stable/api/inference/model_monitor.html#sagemaker.model_monitor.model_monitoring.ModelQualityMonitor) object.

In [None]:
model_quality_monitor = ModelQualityMonitor(
 role=role,
 instance_count=1,
 instance_type='ml.m5.xlarge',
 volume_size_in_gb=20,
 max_runtime_in_seconds=1800,
 sagemaker_session=sagemaker_session
)

### Run model quality baseline job

Next, you run a model quality baseline job. As inputs, you need to provide a validation or test dataset with model predictions to establish a model performance baseline. For convenience and illustration, you are provided with `validation-with-predictions.csv` with the format `{probability}/{prediction}/{label}` for the `ModelQualityMonitor` to compute a performance baseline. In a real production environment, you should consider a feedback mechanism such as [SageMaker Augmented AI](https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-use-augmented-ai-a2i-human-review-loops.html) for error analysis and creating ground truth model performance metrics to set a model performance baseline. Note you need to provide at least 200 samples to compute model performance metric standard deviations.

Call the `suggest_baseline` method of the `ModelQualityMonitor` object to run a baseline job.

Note: this step can take about 8-10 min.

In [None]:
model_quality_baseline_job_name = f"ModelQualityBaselineJob-{datetime.utcnow():%Y-%m-%d-%H%M}"
model_quality_baseline_job_result_uri = f"{s3_baseline_results_path}/model_quality"

model_quality_baseline_job = model_quality_monitor.suggest_baseline(
 job_name=model_quality_baseline_job_name,
 baseline_dataset="validation-with-predictions.csv", # The S3 location of the validation dataset.
 dataset_format=DatasetFormat.csv(header=True),
 output_s3_uri = model_quality_baseline_job_result_uri, # The S3 location to store the results.
 problem_type="BinaryClassification",
 inference_attribute= "prediction", # The column in the dataset that contains predictions.
 probability_attribute= "probability", # The column in the dataset that contains probabilities.
 ground_truth_attribute= "label" # The column in the dataset that contains ground truth labels.
)

model_quality_baseline_job.wait(logs=False)

View the suggested model quality baseline constraints.

In [None]:
latest_model_quality_baseline_job = model_quality_monitor.latest_baselining_job
pd.DataFrame(latest_model_quality_baseline_job.suggested_constraints().body_dict["binary_classification_constraints"]).T

### Schedule continuous model quality monitoring

You can create a model monitoring schedule for the endpoint created earlier.

Use the baseline resources (constraints and statistics) to compare against the real-time traffic hourly.

In [None]:
model_quality_monitor_schedule_name = (
 f"xgboost-dm-model-monitoring-schedule-{datetime.utcnow():%Y-%m-%d-%H%M}"
)

In [None]:
# Create an EndpointInput configuration.
endpointInput = EndpointInput(
 endpoint_name=predictor.endpoint_name,
 probability_attribute="0",
 probability_threshold_attribute=0.5,
 destination="/opt/ml/processing/input_data",
)

In [None]:
# Define a monitoring schedule.
response = model_quality_monitor.create_monitoring_schedule(
 monitor_schedule_name=model_quality_monitor_schedule_name,
 endpoint_input=endpointInput,
 output_s3_uri=model_quality_baseline_job_result_uri,
 problem_type="BinaryClassification",
 ground_truth_input=s3_ground_truth_upload_path,
 constraints=latest_model_quality_baseline_job.suggested_constraints(),
 schedule_cron_expression=CronExpressionGenerator.hourly(),
 enable_cloudwatch_metrics=True,
)

In [None]:
# Check the model monitor was created.
predictor.list_monitors()

In [None]:
# You will see the monitoring schedule in the 'Scheduled' status.
model_quality_monitor.describe_schedule()

In [None]:
# Initially there will be no executions since the first execution happens at the top of the hour
# Note that it is common for the execution to launch up to 20 min after the hour.
executions = model_quality_monitor.list_executions()
executions[:5]

## Cleanup

Well done! If you are finished with the notebook, run the following cells to terminate lab resources and prevent continued charges.

First, stop the worker threads.

In [None]:
invoke_endpoint_thread.terminate()
ground_truth_thread.terminate()

Finally, stop and then delete all monitors scheduled to the endpoint.

If the following cell throws an error similar to `ClientError: An error occurred (ValidationException) when calling the DeleteMonitoringSchedule operation: can't delete schedule as it has in-progress executions`, wait a few minutes and run this cell again. You can't delete a monitor if a monitoring job is executing, once it is done, you can delete the monitoring schedule.

In [None]:
model_monitors = predictor.list_monitors()

for monitor in model_monitors:
 monitor.stop_monitoring_schedule()
 monitor.delete_monitoring_schedule()