{
"cells": [
{
"cell_type": "markdown",
"id": "83472487",
"metadata": {},
"source": [
"# Train, deploy and monitor a XGBoost regression model with the Amazon SageMaker and alert using AWS Lambda and Amazon SNS\n",
"\n",
"[Amazon SageMaker](https://aws.amazon.com/sagemaker/) is a fully managed machine learning service. With SageMaker, you have the option of using the built-in algorithms or you can bring your own algorithms and frameworks. One of the features offered by Amazon SageMaker is [Model Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html) which continuously monitors the quality of Amazon SageMaker machine learning models in production. With Model Monitor, you can set alerts that notify you when there are deviations from the specified baseline for the specified monitoring type. \n",
"\n",
"This notebook demonstrates how to use the SageMaker's [built-in XGBoost algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) to train a regression model on the [California Housing dataset](https://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html), deploy it on a SageMaker inference endpoint and monitor using SageMaker Model Monitor. Of the various monitoring types supported by the Model Monitor, we will demonstrate [Data Quality Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-data-quality.html) and [Model Quality Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality.html) in this notebook. When you complete running through all the steps in this notebook, you will notice both Data Quality and Model Quality violations assuming you have not changed the data or the processing logic. Finally, this notebook will provide an overview of the process involved in setting up alerting using [AWS Lambda](https://aws.amazon.com/lambda/) and [Amazon SNS](https://aws.amazon.com/sns) to send out e-mail or text based alerts. A code sample for the Lambda function is also included.\n",
"\n",
"With the Data Quality Monitor, you can detect data quality drift by tracking the difference between data that was used to train the models versus the data that is being presented to the model to score.\n",
"\n",
"With the Model Quality Monitor, you can monitor model characteristics (such as MAE, MSE, RMSE, R2, precision, accuracy, recall and more) of your ML models in real time. SageMaker Model monitor reports how well a ML model is predicting outcomes by comparing model prediction to ground truth data.\n",
"\n",
"**Note:**\n",
"\n",
"* This notebook should only be run from within a SageMaker notebook instance as it references SageMaker native APIs.\n",
"* At the time of writing this notebook, the most relevant latest version of the Jupyter notebook kernel for this notebook was `conda_python3` and this came built-in with SageMaker notebooks.\n",
"* This notebook uses CPU based instances for training.\n",
"* This notebook will create resources in the same AWS account and in the same region where this notebook is running.\n",
"\n",
"**Table of Contents:**\n",
"\n",
"1. [Complete prerequisites](#Complete%20prerequisites)\n",
"\n",
" 1. [Check and configure access to the Internet](#Check%20and%20configure%20access%20to%20the%20Internet)\n",
"\n",
" 2. [Check and upgrade required software versions](#Check%20and%20upgrade%20required%20software%20versions)\n",
" \n",
" 3. [Check and configure security permissions](#Check%20and%20configure%20security%20permissions)\n",
"\n",
" 4. [Organize imports](#Organize%20imports)\n",
" \n",
" 5. [Create common objects](#Create%20common%20objects)\n",
"\n",
"2. [Prepare the data](#Prepare%20the%20data)\n",
"\n",
" 1. [Create the local directories](#Create%20the%20local%20directories)\n",
" \n",
" 2. [Load the dataset and view the details](#Load%20the%20dataset%20and%20view%20the%20details)\n",
" \n",
" 3. [(Optional) Visualize the dataset](#(Optional)%20Visualize%20the%20dataset)\n",
" \n",
" 4. [Split the dataset into train, validate and test sets](#Split%20the%20dataset%20into%20train,%20validate%20and%20test%20sets)\n",
" \n",
" 5. [Standardize the datasets](#Standardize%20the%20datasets)\n",
" \n",
" 6. [Save the prepared datasets locally](#Save%20the%20prepared%20datasets%20locally)\n",
" \n",
" 7. [Upload the prepared datasets to S3](#Upload%20the%20prepared%20datasets%20to%20S3)\n",
"\n",
"3. [Perform training](#Perform%20training)\n",
"\n",
" 1. [Set the training parameters](#Set%20the%20training%20parameters)\n",
" \n",
" 2. [(Optional) Delete previous checkpoints](#(Optional)%20Delete%20previous%20checkpoints)\n",
" \n",
" 3. [Run the training job](#Run%20the%20training%20job)\n",
"\n",
"4. [Perform deployment](#Perform%20deployment)\n",
"\n",
" 1. [Set the deployment parameters](#Set%20the%20deployment%20parameters)\n",
" \n",
" 2. [Set the data capture parameters](#Set%20the%20data%20capture%20parameters)\n",
" \n",
" 3. [(Optional) Delete previously deployed resources](#(Optional)%20Delete%20previously%20deployed%20resources)\n",
"\n",
" 4. [Deploy the model](#Deploy%20the%20model)\n",
"\n",
"5. [Create monitoring baselines](#Create%20monitoring%20baselines)\n",
"\n",
" 1. [Data Quality Monitor](#Data%20Quality%20Monitor)\n",
" \n",
" 1. [Prepare the dataset for baselining](#Prepare%20the%20dataset%20for%20baselining_DQ)\n",
" \n",
" 2. [Create the Data Quality Monitor](#Create%20the%20Data%20Quality%20Monitor_DQ)\n",
"\n",
" 3. [Generate the baseline](#Generate%20the%20baseline_DQ)\n",
" \n",
" 4. [Display the generated baseline](#Display%20the%20generated%20baseline_DQ)\n",
"\n",
" 2. [Model Quality Monitor](#Model%20Quality%20Monitor)\n",
" \n",
" 1. [Prepare the dataset for baselining](#Prepare%20the%20dataset%20for%20baselining_MQ)\n",
" \n",
" 2. [Create the Model Quality Monitor](#Create%20the%20Model%20Quality%20Monitor_MQ)\n",
" \n",
" 3. [Generate the baseline](#Generate%20the%20baseline_MQ)\n",
" \n",
" 4. [Display the generated baseline](#Display%20the%20generated%20baseline_MQ)\n",
" \n",
"6. [Schedule monitoring jobs](#Schedule%20monitoring%20jobs)\n",
"\n",
" 1. [Create Data Quality monitoring schedule](#Create%20Data%20Quality%20monitoring%20schedule)\n",
" \n",
" 2. [Create Model Quality monitoring schedule](#Create%20Model%20Quality%20monitoring%20schedule)\n",
" \n",
" 3. [Print schedule details](#Print%20schedule%20details)\n",
" \n",
" 4. [Start/stop/delete schedules](#Start/stop/delete%20schedules)\n",
"\n",
"7. [Send traffic to endpoint](#Send%20traffic%20to%20endpoint)\n",
"\n",
"8. [Analyze monitoring executions](#Analyze%20monitoring%20executions)\n",
"\n",
" 1. [View monitoring executions](#View%20monitoring%20executions)\n",
" \n",
" 2. [View latest constraint violations](#View%20latest%20constraint%20violations)\n",
"\n",
"9. [Generate alerts](#Generate%20alerts)\n",
"\n",
"10. [Cleanup](#Cleanup)\n"
]
},
{
"cell_type": "markdown",
"id": "7a13fa0a",
"metadata": {},
"source": [
"## 1. Complete prerequisites \n",
"\n",
"Check and complete the prerequisites."
]
},
{
"cell_type": "markdown",
"id": "8ab76eb0",
"metadata": {},
"source": [
"### A. Check and configure access to the Internet \n",
"\n",
"This notebook requires outbound access to the Internet to download the required software updates. You can either provide direct Internet access (default) or provide Internet access through a VPC. For more information on this, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/appendix-notebook-and-internet-access.html)."
]
},
{
"cell_type": "markdown",
"id": "0466b86b",
"metadata": {},
"source": [
"### B. Check and upgrade required software versions \n",
"\n",
"This notebook requires:\n",
"* [SageMaker Python SDK version 2.x](https://sagemaker.readthedocs.io/en/stable/v2.html)\n",
"* [Python 3.6.x](https://www.python.org/downloads/release/python-360/)\n",
"* [Boto3](https://boto3.amazonaws.com/v1/documentation/api/latest/index.html)\n",
"\n",
"Note: If you get 'module not found' errors in the following cell, then uncomment the appropriate installation commands and install the modules. Also, uncomment and run the kernel shutdown command. When the kernel comes back, comment out the installation and kernel shutdown commands and run the following cell. Now, you should not see any errors."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8051a104",
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import IPython\n",
"import sagemaker\n",
"import sys\n",
"\n",
"\"\"\"\n",
"Last tested versions:\n",
"SageMaker Python SDK version : 2.41.0\n",
"Python version : 3.6.13 | packaged by conda-forge | (default, Feb 19 2021, 05:36:01) \n",
"[GCC 9.3.0]\n",
"Boto3 version : 1.17.77\n",
"\"\"\"\n",
"\n",
"# Install/upgrade the Sagemaker SDK and Boto3\n",
"#!{sys.executable} -m pip install -U sagemaker boto3\n",
"#IPython.Application.instance().kernel.do_shutdown(True)\n",
"\n",
"# Get the current installed version of Sagemaker SDK, Python and Boto3\n",
"print('SageMaker Python SDK version : {}'.format(sagemaker.__version__))\n",
"print('Python version : {}'.format(sys.version))\n",
"print('Boto3 version : {}'.format(boto3.__version__))"
]
},
{
"cell_type": "markdown",
"id": "4eef1ed1",
"metadata": {},
"source": [
"### C. Check and configure security permissions \n",
"\n",
"This notebook uses the IAM role attached to the underlying notebook instance. This role should have the following permissions,\n",
"\n",
"1. Full access to the S3 bucket that will be used to store training and output data.\n",
"2. Full access to launch training instances.\n",
"3. Full access to deploy models.\n",
"4. Full access to launch monitoring instances and schedules.\n",
"5. Access to write to CloudWatch logs and metrics.\n",
"\n",
"To view the name of this role, run the following cell."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e1b410d7",
"metadata": {},
"outputs": [],
"source": [
"print(sagemaker.get_execution_role())"
]
},
{
"cell_type": "markdown",
"id": "0071ba81",
"metadata": {},
"source": [
"### D. Organize imports \n",
"\n",
"Organize all the library and module imports for later use."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "dba6b133",
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import logging\n",
"import matplotlib.pyplot as plt\n",
"import numpy as np\n",
"import os\n",
"import pandas as pd\n",
"import sklearn.model_selection\n",
"from sklearn.preprocessing import StandardScaler\n",
"from sagemaker.inputs import TrainingInput\n",
"from sagemaker.model_monitor import CronExpressionGenerator\n",
"from sagemaker.model_monitor import DataCaptureConfig\n",
"from sagemaker.model_monitor import DefaultModelMonitor\n",
"from sagemaker.model_monitor import ModelQualityMonitor\n",
"from sagemaker.model_monitor.dataset_format import DatasetFormat\n",
"from sagemaker.model_monitor.model_monitoring import EndpointInput\n",
"from sagemaker.serializers import CSVSerializer\n",
"from sagemaker.deserializers import CSVDeserializer\n",
"import datetime\n",
"import random"
]
},
{
"cell_type": "markdown",
"id": "c5942da5",
"metadata": {},
"source": [
"### E. Create common objects \n",
"\n",
"Create common objects to be used in future steps in this notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "906fa7ef",
"metadata": {},
"outputs": [],
"source": [
"# Specify the S3 bucket name\n",
"s3_bucket = ''\n",
"\n",
"# Create the S3 Boto3 resource\n",
"s3_resource = boto3.resource('s3')\n",
"s3_bucket_resource = s3_resource.Bucket(s3_bucket)\n",
"\n",
"# Create the SageMaker Boto3 client\n",
"sm_client = boto3.client('sagemaker')\n",
"\n",
"# Base name to be used to create resources\n",
"nb_name = 'sm-xgboost-ca-housing-monitor-alert'\n",
"\n",
"# Names of various resources\n",
"train_job_name = 'train-{}'.format(nb_name)\n",
"model_name = 'model-{}'.format(nb_name)\n",
"endpoint_name = 'endpt-{}'.format(nb_name)\n",
"dq_base_job_name = 'dq-{}'.format(nb_name)\n",
"mq_base_job_name = 'mq-{}'.format(nb_name)\n",
"dq_baseline_job_name_prefix = 'dq-bsl-job-{}'.format(nb_name)\n",
"mq_baseline_job_name_prefix = 'mq-bsl-job-{}'.format(nb_name)\n",
"dq_mon_schedule_name = 'dq-mon-sch-{}'.format(nb_name)\n",
"mq_mon_schedule_name = 'mq-mon-sch-{}'.format(nb_name)\n",
"\n",
"# Names of local sub-directories in the notebook file system\n",
"data_dir = os.path.join(os.getcwd(), 'data/{}'.format(nb_name))\n",
"train_dir = os.path.join(os.getcwd(), 'data/{}/train'.format(nb_name))\n",
"val_dir = os.path.join(os.getcwd(), 'data/{}/validate'.format(nb_name))\n",
"test_dir = os.path.join(os.getcwd(), 'data/{}/test'.format(nb_name))\n",
"ground_truth_dir = os.path.join(os.getcwd(), 'data/{}/ground-truth'.format(nb_name))\n",
"\n",
"# Location of the datasets file in the notebook file system\n",
"dataset_csv_file = os.path.join(os.getcwd(), 'datasets/california_housing.csv')\n",
"\n",
"# Sub-folder names in S3\n",
"train_dir_s3_prefix = '{}/data/train'.format(nb_name)\n",
"val_dir_s3_prefix = '{}/data/validate'.format(nb_name)\n",
"test_dir_s3_prefix = '{}/data/test'.format(nb_name)\n",
"ground_truth_dir_s3_prefix = '{}/data/ground-truth'.format(nb_name)\n",
"\n",
"# Location in S3 where the model checkpoint will be stored\n",
"model_checkpoint_s3_path = 's3://{}/{}/checkpoint/'.format(s3_bucket, nb_name)\n",
"\n",
"# Location in S3 where the trained model will be stored\n",
"model_output_s3_path = 's3://{}/{}/output/'.format(s3_bucket, nb_name)\n",
"\n",
"# Location in S3 that will contain all model monitor related files\n",
"model_monitor_s3_path = 's3://{}/{}/model-monitor'.format(s3_bucket, nb_name)\n",
"\n",
"# Location in S3 where the data captured from the endpoint will be stored.\n",
"# Do not add a '/' at the end of the path here; it will be automatically added by the endpoint. If you add\n",
"# a '/' at the end, then the monitoring schedule will not be able to find the data capture files and your\n",
"# schedule will fail even if files are present in that S3 location.\n",
"data_capture_output_s3_path = '{}/data-capture'.format(model_monitor_s3_path)\n",
"\n",
"# Location in S3 where the results from the baseline job will be stored\n",
"dq_baseline_job_output_s3_path = '{}/data-quality/baseline/'.format(model_monitor_s3_path)\n",
"\n",
"# Location in S3 where the results from the baseline job will be stored\n",
"mq_baseline_job_output_s3_path = '{}/model-quality/baseline/'.format(model_monitor_s3_path)\n",
"\n",
"# Location in S3 where the results from the monitoring job will be stored\n",
"# Do not add a '/' at the end of the path here\n",
"dq_mon_schedule_output_s3_path = '{}/data-quality/monitoring'.format(model_monitor_s3_path)\n",
"\n",
"# Location in S3 where the results from the monitoring job will be stored\n",
"# Do not add a '/' at the end of the path here\n",
"mq_mon_schedule_output_s3_path = '{}/model-quality/monitoring'.format(model_monitor_s3_path)\n",
"\n",
"# Set the inference id prefix to consistently use it when invoking the endpoint\n",
"# at various steps in this notebook\n",
"inference_id_prefix = 'FromNotebook_'"
]
},
{
"cell_type": "markdown",
"id": "37e835fd",
"metadata": {},
"source": [
"## 2. Prepare the data \n",
"\n",
"The [California Housing dataset](https://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html) consists of 20,640 observations on housing prices with 9 economic covariates. These covariates are,\n",
"\n",
"* MedianHouseValue\n",
"* MedianIncome\n",
"* HousingMedianAge\n",
"* TotalRooms\n",
"* TotalBedrooms\n",
"* Population\n",
"* Households\n",
"* Latitude\n",
"* Longitude\n",
"\n",
"This dataset has been downloaded to the local `datasets` directory and modified as a CSV file with the feature names in the first row. This will be used in this notebook.\n",
"\n",
"The following steps will help with preparing the datasets for training, validation and testing."
]
},
{
"cell_type": "markdown",
"id": "f39b9700",
"metadata": {},
"source": [
"### A) Create the local directories \n",
"\n",
"Create the directories in the local system where the dataset will be copied to and processed."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f0bed173",
"metadata": {},
"outputs": [],
"source": [
"# Create the local directories\n",
"os.makedirs(data_dir, exist_ok=True)\n",
"os.makedirs(train_dir, exist_ok=True)\n",
"os.makedirs(val_dir, exist_ok=True)\n",
"os.makedirs(test_dir, exist_ok=True)\n",
"os.makedirs(ground_truth_dir, exist_ok=True)"
]
},
{
"cell_type": "markdown",
"id": "6ac4a262",
"metadata": {},
"source": [
"### B) Load the dataset and view the details \n",
"\n",
"Check if the CSV file exists in the `datasets` directory and load it into a Pandas DataFrame. Finally, print the details of the dataset."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "298167b3",
"metadata": {},
"outputs": [],
"source": [
"# Check if the dataset file exists and proceed\n",
"if os.path.exists(dataset_csv_file):\n",
" print('Dataset CSV file \\'{}\\' exists.'.format(dataset_csv_file))\n",
" # Load the data into a Pandas DataFrame\n",
" pd_data_frame = pd.read_csv(dataset_csv_file)\n",
" # Print the first 5 records\n",
" #print(pd_data_frame.head(5))\n",
" # Describe the dataset\n",
" print(pd_data_frame.describe())\n",
"else:\n",
" print('Dataset CSV file \\'{}\\' does not exist.'.format(dataset_csv_file))"
]
},
{
"cell_type": "markdown",
"id": "f829d749",
"metadata": {},
"source": [
"### C) (Optional) Visualize the dataset \n",
"\n",
"Display the distributions in the dataset."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "77db6854",
"metadata": {},
"outputs": [],
"source": [
"pd_data_frame.hist(bins=50, figsize=(15,15))\n",
"plt.show()"
]
},
{
"cell_type": "markdown",
"id": "0cdbc010",
"metadata": {},
"source": [
"### D) Split the dataset into train, validate and test sets \n",
"\n",
"Split the dataset into train, validate and test sets after shuffling. Split further into x and y sets."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "271d7b32",
"metadata": {},
"outputs": [],
"source": [
"# Split into train and test datasets after shuffling\n",
"train, test = sklearn.model_selection.train_test_split(pd_data_frame, test_size=0.2,\n",
" random_state=35, shuffle=True)\n",
"# Split the train dataset further into train and validation datasets after shuffling\n",
"train, val = sklearn.model_selection.train_test_split(train, test_size=0.1,\n",
" random_state=25, shuffle=True)\n",
"\n",
"# Define functions to get x and y columns\n",
"def get_x(df):\n",
" return df[['median_income','housing_median_age','total_rooms','total_bedrooms',\n",
" 'population','households','latitude','longitude']]\n",
"def get_y(df):\n",
" return df[['median_house_value']]\n",
"\n",
"# Load the x and y columns for train, validation and test datasets\n",
"x_train = get_x(train)\n",
"y_train = get_y(train)\n",
"x_val = get_x(val)\n",
"y_val = get_y(val)\n",
"x_test = get_x(test)\n",
"y_test = get_y(test)\n",
"\n",
"# Summarize the datasets\n",
"print(\"x_train shape:\", x_train.shape)\n",
"print(\"y_train shape:\", y_train.shape)\n",
"print(\"x_val shape:\", x_val.shape)\n",
"print(\"y_val shape:\", y_val.shape)\n",
"print(\"x_test shape:\", x_test.shape)\n",
"print(\"y_test shape:\", y_test.shape)"
]
},
{
"cell_type": "markdown",
"id": "b4c5b274",
"metadata": {},
"source": [
"### E) Standardize the datasets \n",
"\n",
"* Standardize the x columns of the train dataset using the `fit_transform()` function of `StandardScaler`.\n",
"* Standardize the x columns of the validate and test datasets using the `transform()` function of `StandardScaler`."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "490f0b51",
"metadata": {},
"outputs": [],
"source": [
"# Standardize the dataset\n",
"scaler = StandardScaler()\n",
"x_train = scaler.fit_transform(x_train)\n",
"x_val = scaler.transform(x_val)\n",
"x_test = scaler.transform(x_test)"
]
},
{
"cell_type": "markdown",
"id": "a85d17c0",
"metadata": {},
"source": [
"### F) Save the prepared datasets locally \n",
"\n",
"Save the prepared train, validate and test datasets to local directories. Prior to saving, concatenate x and y columns as needed. Create the directories if they don't exist."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7fc59c6d",
"metadata": {},
"outputs": [],
"source": [
"# Define function to prepend header and create the specified new file\n",
"def create_file_with_header(src_file_full_path, tgt_file_full_path, header):\n",
" with open(src_file_full_path, 'rt') as src_file, open(tgt_file_full_path, 'wt') as tgt_file:\n",
" tgt_file.write(header + '\\n')\n",
" for src_line in src_file:\n",
" tgt_file.write(src_line)\n",
"\n",
"# Save the prepared dataset (in numpy format) to the local directories as csv files\n",
"np.savetxt(os.path.join(train_dir, 'train.csv'),\n",
" np.concatenate((y_train.to_numpy(), x_train), axis=1), delimiter=',')\n",
"np.savetxt(os.path.join(train_dir, 'train_x.csv'), x_train)\n",
"create_file_with_header(os.path.join(train_dir, 'train_x.csv'),\n",
" os.path.join(train_dir, 'train_x_with_header.csv'),\n",
" 'median_income,housing_median_age,total_rooms,total_bedrooms,population,households,latitude,longitude')\n",
"np.savetxt(os.path.join(train_dir, 'train_y.csv'), y_train.to_numpy())\n",
"\n",
"np.savetxt(os.path.join(val_dir, 'validate.csv'),\n",
" np.concatenate((y_val.to_numpy(), x_val), axis=1), delimiter=',')\n",
"np.savetxt(os.path.join(val_dir, 'validate_x.csv'), x_val)\n",
"np.savetxt(os.path.join(val_dir, 'validate_y.csv'), y_val.to_numpy())\n",
"\n",
"np.savetxt(os.path.join(test_dir, 'test.csv'),\n",
" np.concatenate((y_test.to_numpy(), x_test), axis=1), delimiter=',')\n",
"np.savetxt(os.path.join(test_dir, 'test_x.csv'), x_test)\n",
"np.savetxt(os.path.join(test_dir, 'test_y.csv'), y_test.to_numpy())"
]
},
{
"cell_type": "markdown",
"id": "4384bcba",
"metadata": {},
"source": [
"### G) Upload the prepared datasets to S3 \n",
"\n",
"Upload the datasets from the local directories to appropriate sub-directories in the specified S3 bucket."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "9ddf89aa",
"metadata": {},
"outputs": [],
"source": [
"# Upload the data to S3\n",
"train_dir_s3_path = sagemaker.Session().upload_data(path='./data/{}/train/'.format(nb_name),\n",
" bucket=s3_bucket,\n",
" key_prefix=train_dir_s3_prefix)\n",
"val_dir_s3_path = sagemaker.Session().upload_data(path='./data/{}/validate/'.format(nb_name),\n",
" bucket=s3_bucket,\n",
" key_prefix=val_dir_s3_prefix)\n",
"test_dir_s3_path = sagemaker.Session().upload_data(path='./data/{}/test/'.format(nb_name),\n",
" bucket=s3_bucket,\n",
" key_prefix=test_dir_s3_prefix)\n",
"\n",
"# Capture the S3 locations for the uploaded datasets\n",
"train_s3_path = '{}/train.csv'.format(train_dir_s3_path)\n",
"train_x_s3_path = '{}/train_x.csv'.format(train_dir_s3_path)\n",
"train_y_s3_path = '{}/train_y.csv'.format(train_dir_s3_path)\n",
"train_x_with_header_s3_path = '{}/train_x_with_header.csv'.format(train_dir_s3_path)\n",
"val_s3_path = '{}/validate.csv'.format(val_dir_s3_path)\n",
"val_x_s3_path = '{}/validate_x.csv'.format(val_dir_s3_path)\n",
"val_y_s3_path = '{}/validate_y.csv'.format(val_dir_s3_path)\n",
"test_s3_path = '{}/test.csv'.format(test_dir_s3_path)\n",
"test_x_s3_path = '{}/test_x.csv'.format(test_dir_s3_path)\n",
"test_y_s3_path = '{}/test_y.csv'.format(test_dir_s3_path)"
]
},
{
"cell_type": "markdown",
"id": "5838e560",
"metadata": {},
"source": [
"## 3. Perform training \n",
"\n",
"In this step, SageMaker's [built-in XGBoost algorithm](https://docs.aws.amazon.com/sagemaker/latest/dg/xgboost.html) is used to train a regression model on the [California Housing dataset](https://www.dcc.fc.up.pt/~ltorgo/Regression/cal_housing.html).\n",
"\n",
"Note: Although you can perform validation along with training, we will not do that here. Instead, we will run through the validation dataset on the deployed model to create a baseline for the Model Monitor."
]
},
{
"cell_type": "markdown",
"id": "cfaefc19",
"metadata": {},
"source": [
"### A) Set the training parameters \n",
"\n",
"1. Inputs - S3 location of the training data.\n",
"2. Hyperparameters.\n",
"3. Training instance details:\n",
"\n",
" 1. Instance count\n",
" \n",
" 2. Instance type\n",
" \n",
" 3. The max run time of the training job\n",
" \n",
" 4. (Optional) Use Spot instances. For more info, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-managed-spot-training.html).\n",
" \n",
" 5. (Optional) The max wait for Spot instances, if using Spot. This should be larger than the max run time.\n",
" \n",
"4. Base job name\n",
"5. Appropriate local and S3 directories that will be used by the training job."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "92b11af2",
"metadata": {},
"outputs": [],
"source": [
"# Set the input data input along with their content types\n",
"train_input = TrainingInput(train_s3_path, content_type='text/csv')\n",
"inputs = {'train':train_input}\n",
"\n",
"# Set the hyperparameters\n",
"hyperparameters = {\n",
" 'objective':'reg:squarederror',\n",
" 'max_depth':'6',\n",
" 'eta':'0.3',\n",
" 'alpha':'3',\n",
" 'colsample_bytree':'0.7',\n",
" 'num_round':'100'}\n",
"\n",
"# Set the instance count, instance type, volume size, options to use Spot instances and other parameters\n",
"train_instance_count = 1\n",
"train_instance_type = 'ml.m5.xlarge'\n",
"train_instance_volume_size_in_gb = 5\n",
"#use_spot_instances = True\n",
"#spot_max_wait_time_in_seconds = 5400\n",
"use_spot_instances = False\n",
"spot_max_wait_time_in_seconds = None\n",
"max_run_time_in_seconds = 3600\n",
"region_name = sagemaker.Session().boto_region_name\n",
"algorithm_name = 'xgboost'\n",
"algorithm_version = '1.2-1'\n",
"py_version = 'py37'\n",
"# Get the container image URI for the specified parameters\n",
"container_image_uri = sagemaker.image_uris.retrieve(framework=algorithm_name,\n",
" region=region_name,\n",
" version=algorithm_version,\n",
" py_version=py_version,\n",
" instance_type=train_instance_type,\n",
" image_scope='training')\n",
"\n",
"# Set the training container related parameters\n",
"container_log_level = logging.INFO\n",
"\n",
"# Location where the model checkpoints will be stored locally in the container before being uploaded to S3\n",
"model_checkpoint_local_dir = '/opt/ml/checkpoints/'\n",
"\n",
"# Location where the trained model will be stored locally in the container before being uploaded to S3\n",
"model_local_dir = '/opt/ml/model'"
]
},
{
"cell_type": "markdown",
"id": "b7009af2",
"metadata": {},
"source": [
"### B) (Optional) Delete previous checkpoints \n",
"\n",
"If model checkpoints from previous trainings are found in the S3 checkpoint location specified in the previous step, then training will resume from those checkpoints. In order to start a fresh training, run the following code cell to delete all checkpoint objects from S3."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "892cb81f",
"metadata": {},
"outputs": [],
"source": [
"# Delete the checkpoints if you want to train from the beginning; else ignore this code cell\n",
"for checkpoint_file in s3_bucket_resource.objects.filter(Prefix='{}/checkpoint/'.format(nb_name)):\n",
" checkpoint_file_key = checkpoint_file.key\n",
" print('Deleting {} ...'.format(checkpoint_file_key))\n",
" s3_resource.Object(s3_bucket_resource.name, checkpoint_file_key).delete()"
]
},
{
"cell_type": "markdown",
"id": "989b2817",
"metadata": {},
"source": [
"### C) Run the training job \n",
"\n",
"Prepare the `estimator` and call the `fit()` method. This will pull the container containing the specified version of the algorithm in the AWS region and run the training job in the specified type of EC2 instance(s). The training data will be pulled from the specified location in S3 and training results and checkpoints will be written to the specified locations in S3.\n",
"\n",
"Note: SageMaker Debugger is disabled."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e2da0162",
"metadata": {},
"outputs": [],
"source": [
"# Create the estimator\n",
"estimator = sagemaker.estimator.Estimator(\n",
" image_uri=container_image_uri,\n",
" checkpoint_local_path=model_checkpoint_local_dir,\n",
" checkpoint_s3_uri=model_checkpoint_s3_path,\n",
" model_dir=model_local_dir,\n",
" output_path=model_output_s3_path,\n",
" instance_type=train_instance_type,\n",
" instance_count=train_instance_count,\n",
" use_spot_instances=use_spot_instances,\n",
" max_wait=spot_max_wait_time_in_seconds,\n",
" max_run=max_run_time_in_seconds,\n",
" hyperparameters=hyperparameters,\n",
" role=sagemaker.get_execution_role(),\n",
" base_job_name=train_job_name,\n",
" framework_version=algorithm_version,\n",
" py_version=py_version,\n",
" container_log_level=container_log_level,\n",
" script_mode=False,\n",
" debugger_hook_config=False,\n",
" disable_profiler=True)\n",
"\n",
"# Perform the training\n",
"estimator.fit(inputs, wait=True)"
]
},
{
"cell_type": "markdown",
"id": "c9faffed",
"metadata": {},
"source": [
"## 4. Perform deployment \n",
"\n",
"In this step, we will deploy the model generated in the training step earlier. For deployment, we will use [SageMaker Python SDK version 2.x](https://sagemaker.readthedocs.io/en/stable/v2.html) and directly call the `deploy()` method on the `estimator` object used for training. This type of deployment is not recommended for anything beyond development purposes."
]
},
{
"cell_type": "markdown",
"id": "7f3774e9",
"metadata": {},
"source": [
"### A) Set the deployment parameters \n",
"\n",
"1. Deployment instance details:\n",
"\n",
" 1. Instance count\n",
" \n",
" 2. Instance type\n",
" \n",
" 3. The Elastic Inference accelerator type\n",
" \n",
"2. Serializer and deserializer - set them to CSV as the data will be in CSV format."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "36bcafe5",
"metadata": {},
"outputs": [],
"source": [
"# Set the instance count, instance type, endpoint name and other parameters\n",
"deploy_initial_instance_count = 1\n",
"deploy_instance_type = 'ml.m5.xlarge'\n",
"accelerator_type = None\n",
"serializer = CSVSerializer(content_type='text/csv')\n",
"deserializer = CSVDeserializer(encoding='utf-8', accept='text/csv')"
]
},
{
"cell_type": "markdown",
"id": "bb883f9c",
"metadata": {},
"source": [
"### B) Set the data capture parameters \n",
"\n",
"Setting up data capture will automatically capture the specified request/response data for calls made to the SageMaker inference endpoint and store them in the specified location in S3. This is required for the Model Monitor to work."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8980ceec",
"metadata": {},
"outputs": [],
"source": [
"# Set the data capture configuration\n",
"data_capture_config = DataCaptureConfig(\n",
" enable_capture = True,\n",
" sampling_percentage=100,\n",
" destination_s3_uri=data_capture_output_s3_path,\n",
" capture_options=['REQUEST', 'RESPONSE'],\n",
" csv_content_types=['text/csv'],\n",
" json_content_types=None,\n",
" sagemaker_session=sagemaker.Session())"
]
},
{
"cell_type": "markdown",
"id": "81583fd4",
"metadata": {},
"source": [
"### C) (Optional) Delete previously deployed resources \n",
"\n",
"This step deletes the model, endpoint configuration and endpoint. You may want to run this step if you are running only some parts of this notebook especially deploying after re-training the model.\n",
"\n",
"Note: You may run into errors if the model, endpoint or endpoint config does not exist. This may be due to partial deletes in the past. In this case, comment out the appropriate lines of the code and run the rest. Alternatively, you can go to the [SageMaker console](https://console.aws.amazon.com/sagemaker/home), switch to the required region and delete these resources."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "07892abe",
"metadata": {},
"outputs": [],
"source": [
"# Delete the model, endpoint configuration and endpoint\n",
"endpoint_config_name = sm_client.describe_endpoint(EndpointName=endpoint_name)['EndpointConfigName']\n",
"sm_client.delete_model(ModelName=model_name)\n",
"sm_client.delete_endpoint(EndpointName=endpoint_name)\n",
"sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)"
]
},
{
"cell_type": "markdown",
"id": "82bc71fb",
"metadata": {},
"source": [
"### D) Deploy the model \n",
"\n",
"Deploy the model created in the training process to a SageMaker real-time inference endpoint using the parameters specified in the previous step.\n",
"\n",
"Note: This step automatically creates the endpoint configuration before creating the endpoint."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "7d59f9a2",
"metadata": {},
"outputs": [],
"source": [
"# Deploy the model and automatically create the endpoint configuration in the process\n",
"predictor = estimator.deploy(initial_instance_count=deploy_initial_instance_count,\n",
" instance_type=deploy_instance_type,\n",
" accelerator_type=accelerator_type,\n",
" serializer=serializer,\n",
" deserializer=deserializer,\n",
" model_name=model_name,\n",
" endpoint_name=endpoint_name,\n",
" data_capture_config=data_capture_config,\n",
" wait=True)"
]
},
{
"cell_type": "markdown",
"id": "c62d60ff",
"metadata": {},
"source": [
"## 5. Create monitoring baselines \n",
"\n",
"Prior to setting up monitoring, we need to create baselines for the various monitors. These monitors will monitor the inferences against those baselines and generate violation metrics.\n",
"\n",
"In this notebook, we will use the [Data Quality](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-data-quality.html) and [Model Quality](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality.html) monitors.\n",
"\n",
"Note: For details on how to configure the SageMaker Model Monitor, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html)."
]
},
{
"cell_type": "markdown",
"id": "5a6b3f50",
"metadata": {},
"source": [
"### A) Data Quality Monitor \n",
"\n",
"For detailed information on this, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-data-quality.html)."
]
},
{
"cell_type": "markdown",
"id": "f97ac923",
"metadata": {},
"source": [
"#### a. Prepare the dataset for baselining \n",
"\n",
"For creating the Data Quality baseline, we need the x columns of the dataset. For this, we recommend using the training dataset. In this notebook, this dataset file is prepared in earlier steps. So, no action is required at this point."
]
},
{
"cell_type": "markdown",
"id": "215cd6ac",
"metadata": {},
"source": [
"#### b. Create the Data Quality Monitor \n",
"\n",
"Create the Data Quality Monitor."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "bc76d974",
"metadata": {},
"outputs": [],
"source": [
"# Specify parameters\n",
"dq_instance_count = 1\n",
"dq_instance_type = 'ml.m5.xlarge'\n",
"dq_instance_volume_size_in_gb = 5\n",
"dq_max_run_time_in_seconds = 1800\n",
"\n",
"# Create the Data Quality Monitor\n",
"dq_monitor = DefaultModelMonitor(\n",
" role=sagemaker.get_execution_role(),\n",
" instance_count=dq_instance_count,\n",
" instance_type=dq_instance_type,\n",
" volume_size_in_gb=dq_instance_volume_size_in_gb,\n",
" max_runtime_in_seconds=dq_max_run_time_in_seconds,\n",
" base_job_name=dq_base_job_name,\n",
" sagemaker_session=sagemaker.Session()\n",
")"
]
},
{
"cell_type": "markdown",
"id": "77c89be7",
"metadata": {},
"source": [
"#### c. Generate the baseline \n",
"\n",
"Create the baseline job for the monitor created in the previous step and generate constraints. This will create a [Processing Job](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) in SageMaker.\n",
"\n",
"Note: When the baseline job ends successfully, it may take a minute or two for the generated baseline files to appear in S3."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "71fa64ae",
"metadata": {},
"outputs": [],
"source": [
"# Set the other parameters\n",
"dq_baseline_job_name = '{}-{:%Y%m%d%H%M}'.format(dq_baseline_job_name_prefix, datetime.datetime.now())\n",
"dq_baseline_dataset_format = DatasetFormat.csv(header=True, output_columns_position='START')\n",
"\n",
"# Create the baseline job and generate the constraints\n",
"dq_monitor.suggest_baseline(\n",
" job_name=dq_baseline_job_name,\n",
" baseline_dataset=train_x_with_header_s3_path,\n",
" dataset_format=dq_baseline_dataset_format,\n",
" output_s3_uri=dq_baseline_job_output_s3_path,\n",
" wait=True,\n",
" logs=True\n",
")"
]
},
{
"cell_type": "markdown",
"id": "1c430e0a",
"metadata": {},
"source": [
"#### d. Display the generated baseline \n",
"\n",
"The baseline generated in the previous step will be stored in two files in the specified location in S3 - statistics and constraints. These are suggestions made by the SageMaker Model Monitor baselining job. You can edit them as required. Make sure to verify these before proceeding further.\n",
"\n",
"Run the following code cell to display them here.\n",
"\n",
"Note:\n",
"* The settings in the `monitoring_config` section of the constraints file will control how your endpoint will be monitored when you setup monitoring schedules.\n",
"* For details on Data Quality violations for various problem types, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-interpreting-violations.html).\n",
"* For the schema of the statistics file, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-statistics.html).\n",
"* For the schema of the constraints file, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html)."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "5e082ebb",
"metadata": {},
"outputs": [],
"source": [
"# Get the latest baselining job\n",
"dq_baseline_job = dq_monitor.latest_baselining_job\n",
"\n",
"# Print the statistics\n",
"print('\\nData Quality statistics:\\n')\n",
"print(dq_baseline_job.baseline_statistics().body_dict['features'])\n",
"\n",
"# Print the constraints\n",
"print('\\nData Quality constraints:\\n')\n",
"print(dq_baseline_job.suggested_constraints().body_dict['features'])\n",
"\n",
"# Print the monitoring config\n",
"print('\\nData Quality monitoring config:\\n')\n",
"print(dq_baseline_job.suggested_constraints().body_dict['monitoring_config'])"
]
},
{
"cell_type": "markdown",
"id": "635b149a",
"metadata": {},
"source": [
"### B) Model Quality Monitor \n",
"\n",
"For detailed information on this, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality.html)."
]
},
{
"cell_type": "markdown",
"id": "1e8018ae",
"metadata": {},
"source": [
"#### a. Prepare the dataset for baselining \n",
"\n",
"For creating the Model Quality baseline for a regression problem, we need to create the baseline dataset file (with header) containing two columns - predicted and actual (label) values. For this, we recommend using the validation dataset to invoke the deployed endpoint of the model. For each sample in the validation dataset, we will invoke the endpoint and capture the predicted value. Then, the list of these predicted values along with the actual values (labels) will be written to a CSV file along with the column header and uploaded to S3.\n",
"\n",
"\n",
"Note: Calling the `predict()` method on the `predictor` object will automatically capture the request/response data in S3 as per the `DataCaptureConfig` defined in the deployment step. Although this data is not required for building the baseline, it will be generated because we have configured `DataCaptureConfig` while deploying the model."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "05a9a94d",
"metadata": {},
"outputs": [],
"source": [
"# Iterate over the validate dataset and call the predictor for each row\n",
"predictions_labels_list = []\n",
"predictions_labels_list.append('prediction,label\\n')\n",
"x_val_rows = x_val.tolist()\n",
"y_val_rows = y_val.to_numpy()\n",
"for index, x_val_row in enumerate(x_val_rows, start=1):\n",
" x_val_row_string = ','.join(map(str, x_val_row))\n",
" inference_id = '{}{}'.format(inference_id_prefix, index)\n",
" predicted_object = predictor.predict(data=x_val_row_string,\n",
" target_model=None,\n",
" target_variant=None,\n",
" inference_id=inference_id)\n",
" predicted_value = float(predicted_object[0][0])\n",
" predictions_labels_list.append('{},{}\\n'.format(predicted_value, y_val_rows[index - 1][0]))\n",
"\n",
"# Write the predictions-labels file to the local directory\n",
"predictions_labels_file_name = 'predictions_labels.csv'\n",
"predictions_labels_file_path = os.path.join(val_dir, predictions_labels_file_name)\n",
"with open(predictions_labels_file_path, 'wt') as predictions_labels_file:\n",
" predictions_labels_file.write(''.join(predictions_labels_list))\n",
"\n",
"# Upload the predictions-labels data to S3\n",
"predictions_labels_s3_path = sagemaker.Session().upload_data(path='./data/{}/validate/{}'.format(nb_name,\n",
" predictions_labels_file_name),\n",
" bucket=s3_bucket,\n",
" key_prefix=val_dir_s3_prefix)"
]
},
{
"cell_type": "markdown",
"id": "0f6508fd",
"metadata": {},
"source": [
"#### b. Create the Model Quality Monitor \n",
"\n",
"Create the Model Quality Monitor."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "537a230c",
"metadata": {},
"outputs": [],
"source": [
"# Specify parameters\n",
"mq_instance_count = 1\n",
"mq_instance_type = 'ml.m5.xlarge'\n",
"mq_instance_volume_size_in_gb = 5\n",
"mq_max_run_time_in_seconds = 1800\n",
"\n",
"# Create the Model Quality Monitor\n",
"mq_monitor = ModelQualityMonitor(\n",
" role=sagemaker.get_execution_role(),\n",
" instance_count=mq_instance_count,\n",
" instance_type=mq_instance_type,\n",
" volume_size_in_gb=mq_instance_volume_size_in_gb,\n",
" max_runtime_in_seconds=mq_max_run_time_in_seconds,\n",
" base_job_name=mq_base_job_name,\n",
" sagemaker_session=sagemaker.Session()\n",
")"
]
},
{
"cell_type": "markdown",
"id": "41dddbe1",
"metadata": {},
"source": [
"#### c. Generate the baseline \n",
"\n",
"Create the baseline job for the monitor created in the previous step and generate constraints. This will create a [Processing Job](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) in SageMaker.\n",
"\n",
"Note: When the baseline job ends successfully, it may take a minute or two for the generated baseline files to appear in S3."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "8925d25b",
"metadata": {},
"outputs": [],
"source": [
"# Set the other parameters\n",
"mq_baseline_job_name = '{}-{:%Y%m%d%H%M}'.format(mq_baseline_job_name_prefix, datetime.datetime.now())\n",
"mq_baseline_dataset_format = DatasetFormat.csv(header=True, output_columns_position='START')\n",
"mq_problem_type = 'Regression'\n",
"mq_inference_attribute = 'prediction'\n",
"mq_ground_truth_attribute = 'label'\n",
"\n",
"# Create the baseline job and generate the constraints\n",
"mq_monitor.suggest_baseline(\n",
" job_name=mq_baseline_job_name,\n",
" baseline_dataset=predictions_labels_s3_path,\n",
" dataset_format=mq_baseline_dataset_format,\n",
" output_s3_uri=mq_baseline_job_output_s3_path,\n",
" problem_type=mq_problem_type,\n",
" inference_attribute=mq_inference_attribute,\n",
" ground_truth_attribute=mq_ground_truth_attribute,\n",
" wait=True,\n",
" logs=True\n",
")"
]
},
{
"cell_type": "markdown",
"id": "307d7b58",
"metadata": {},
"source": [
"#### d. Display the generated baseline \n",
"\n",
"The baseline generated in the previous step will be stored in two files in the specified location in S3 - statistics and constraints. These are suggestions made by the SageMaker Model Monitor baselining job. You can edit them as required. Make sure to verify these before proceeding further.\n",
"\n",
"Run the following code cell to display them here.\n",
"\n",
"Note:\n",
"* The settings in the `monitoring_config` section of the constraints file will control how your endpoint will be monitored when you setup monitoring schedules.\n",
"* For details on Model Quality metrics for various problem types, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-metrics.html).\n",
"* For the schema of the statistics file, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-statistics.html).\n",
"* For the schema of the constraints file, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-byoc-constraints.html)."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "88e7ccac",
"metadata": {},
"outputs": [],
"source": [
"# Get the latest baselining job\n",
"mq_baseline_job = mq_monitor.latest_baselining_job\n",
"\n",
"# Print the statistics\n",
"print('\\nModel Quality statistics:\\n')\n",
"print(mq_baseline_job.baseline_statistics().body_dict['regression_metrics'])\n",
"\n",
"# Print the constraints\n",
"print('\\nModel Quality constraints:\\n')\n",
"print(mq_baseline_job.suggested_constraints().body_dict['regression_constraints'])"
]
},
{
"cell_type": "markdown",
"id": "381f2b55",
"metadata": {},
"source": [
"## 6. Schedule monitoring jobs \n",
"\n",
"Amazon SageMaker Model Monitor provides you the ability to continuously monitor the data collected from the endpoints based on the schedules that you define. For more information, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-scheduling.html)."
]
},
{
"cell_type": "markdown",
"id": "4ca3528c",
"metadata": {},
"source": [
"### A) Create Data Quality monitoring schedule \n",
"\n",
"In this notebook, we will use an hourly schedule."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6f6efb4b",
"metadata": {},
"outputs": [],
"source": [
"## Data Quality schedule\n",
"\n",
"# Create the monitoring schedule\n",
"dq_monitor.create_monitoring_schedule(\n",
" monitor_schedule_name=dq_mon_schedule_name,\n",
" endpoint_input=predictor.endpoint_name,\n",
" output_s3_uri=dq_mon_schedule_output_s3_path,\n",
" statistics=dq_monitor.baseline_statistics(),\n",
" constraints=dq_monitor.suggested_constraints(),\n",
" schedule_cron_expression=CronExpressionGenerator.hourly(),\n",
" enable_cloudwatch_metrics=True,\n",
")"
]
},
{
"cell_type": "markdown",
"id": "83c5b774",
"metadata": {},
"source": [
"### B) Create Model Quality monitoring schedule \n",
"\n",
"In this notebook, we will use an hourly schedule.\n",
"\n",
"Note: The values for `start_time_offset` and `end_time_offset` for `mq_monitor_schedule_endpoint_input` are dependent on the `max_runtime_in_seconds` value that was specified while creating the Model Quality Monitor. For more information on this, refer [here](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-schedule.html)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "96f72314",
"metadata": {},
"outputs": [],
"source": [
"# Ingest ground truth labels and merge them with predictions\n",
"\n",
"# Randomly set y column's value as 1,000,000 for 20% of the time.\n",
"# This will result in violations on Model Quality that you will\n",
"# observe when monitoring completes.\n",
"def generate_synthetic_ground_truth(inference_id_suffix, y_test_value):\n",
" random.seed(inference_id_suffix)\n",
" rand = random.random()\n",
" return {\n",
" 'groundTruthData': {\n",
" 'data': '1000000' if rand < 0.2 else y_test_value,\n",
" 'encoding': 'CSV'\n",
" },\n",
" 'eventMetadata': {\n",
" 'eventId': '{}{}'.format(inference_id_prefix, inference_id_suffix)\n",
" },\n",
" 'eventVersion': '0'\n",
" }\n",
"\n",
"# Iterate over the y_test dataset\n",
"synthetic_ground_truth_list = []\n",
"y_test_rows = y_test.values.tolist()\n",
"for index, y_test_row in enumerate(y_test_rows, start=1):\n",
" synthetic_ground_truth_list.append(json.dumps(generate_synthetic_ground_truth(index, str(y_test_row[0]))))\n",
" \n",
"# Write the synthetic ground truth file to the local directory\n",
"synthetic_ground_truth_file_name = 'synthetic_ground_truth.jsonl'\n",
"synthetic_ground_truth_file_path = os.path.join(ground_truth_dir, synthetic_ground_truth_file_name)\n",
"with open(synthetic_ground_truth_file_path, 'wt') as synthetic_ground_truth_file:\n",
" synthetic_ground_truth_file.write('\\n'.join(synthetic_ground_truth_list))\n",
" \n",
"# Upload the synthetic ground truth file to S3\n",
"synthetic_ground_truth_s3_path_suffix = datetime.datetime.now().strftime('/%Y/%m/%d/%H')\n",
"sagemaker.Session().upload_data(path='./data/{}/ground-truth/{}'.format(nb_name, synthetic_ground_truth_file_name),\n",
" bucket=s3_bucket, key_prefix='{}{}'.format(ground_truth_dir_s3_prefix,\n",
" synthetic_ground_truth_s3_path_suffix))\n",
"synthetic_ground_truth_s3_path_prefix = 's3://{}/{}'.format(s3_bucket, ground_truth_dir_s3_prefix)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "25c2f0a6",
"metadata": {},
"outputs": [],
"source": [
"## Model Quality schedule\n",
"\n",
"# Create the endpoint input\n",
"mq_monitor_schedule_endpoint_input = EndpointInput(endpoint_name=predictor.endpoint_name,\n",
" destination='/opt/ml/processing/mq_monitor/input_data',\n",
" inference_attribute='0',\n",
" start_time_offset='-PT1H',\n",
" end_time_offset='-PT0H')\n",
"# Create the monitoring schedule\n",
"mq_monitor.create_monitoring_schedule(\n",
" monitor_schedule_name=mq_mon_schedule_name,\n",
" endpoint_input=mq_monitor_schedule_endpoint_input,\n",
" problem_type=mq_problem_type,\n",
" ground_truth_input=synthetic_ground_truth_s3_path_prefix,\n",
" output_s3_uri=mq_mon_schedule_output_s3_path,\n",
" constraints=mq_monitor.suggested_constraints(),\n",
" schedule_cron_expression=CronExpressionGenerator.hourly(),\n",
" enable_cloudwatch_metrics=True\n",
")"
]
},
{
"cell_type": "markdown",
"id": "4141e461",
"metadata": {},
"source": [
"### C) Print schedule details "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "31dcd07f",
"metadata": {},
"outputs": [],
"source": [
"## Data Quality details\n",
"# Describe and print status\n",
"dq_monitor_schedule_details = dq_monitor.describe_schedule()\n",
"print('\\nData Quality Monitor - schedule details:\\n')\n",
"print(dq_monitor_schedule_details)\n",
"print('\\nData Quality Monitor - schedule status: {}'.format(dq_monitor_schedule_details['MonitoringScheduleStatus']))\n",
"\n",
"## Model Quality details\n",
"# Describe and print status\n",
"mq_monitor_schedule_details = mq_monitor.describe_schedule()\n",
"print('\\nModel Quality Monitor - schedule details:\\n')\n",
"print(mq_monitor_schedule_details)\n",
"print('\\nModel Quality Monitor - schedule status: {}'.format(mq_monitor_schedule_details['MonitoringScheduleStatus']))"
]
},
{
"cell_type": "markdown",
"id": "066350d6",
"metadata": {},
"source": [
"### D) Start/stop/delete schedules "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0c6d672a",
"metadata": {},
"outputs": [],
"source": [
"## Data Quality schedule controls\n",
"#dq_monitor.start_monitoring_schedule()\n",
"#dq_monitor.stop_monitoring_schedule()\n",
"#dq_monitor.delete_monitoring_schedule()\n",
"\n",
"## Model Quality schedule controls\n",
"#mq_monitor.start_monitoring_schedule()\n",
"#mq_monitor.stop_monitoring_schedule()\n",
"#mq_monitor.delete_monitoring_schedule()"
]
},
{
"cell_type": "markdown",
"id": "e9048601",
"metadata": {},
"source": [
"## 7. Send traffic to endpoint \n",
"\n",
"In this step, we will send traffic to the endpoint by calling the `predict()` method on the `predictor` object. This will automatically capture the request/response data in S3 as per the `DataCaptureConfig` defined in the deployment step. Based on the schedules, each of the monitors that we defined in the earlier steps will read this data and compare with their baselines. If any violations are found, a `constraint_violations.json` file will be generated by each monitor.\n",
"\n",
"In this step, you can introduce data quality and model quality issues by changing the x columns being sent to the `predictor`. This will help in understanding the capabilities of the monitors in detail.\n",
"\n",
"This noteboook will use the test dataset to send the traffic for inference. This step will iterate through the test dataset and complete when done. This means that the monitors may not see captured data in every monitoring cycle and will report the status of the monitoring execution as 'Failed'. If you like data to be generated continuously, you can modify this code but keep in mind that you will incur unnecessary costs if you forget to stop this after your testing."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6bd2b7a1",
"metadata": {},
"outputs": [],
"source": [
"# When you send this traffic to the endpoint, you will observe both Data Quality\n",
"# and Model Quality violations when monitoring completes. This is based on the\n",
"# assumption that you have not changed the data or the processing logic in this\n",
"# notebook.\n",
"\n",
"# Iterate over the test data and call the predictor for each row\n",
"x_test_rows = x_test.tolist()\n",
"for index, x_test_row in enumerate(x_test_rows, start=1):\n",
" x_test_row_string = ','.join(map(str, x_test_row))\n",
" # Auto-generate an inference-id to track the request/response in the captured data\n",
" inference_id = '{}{}'.format(inference_id_prefix, index)\n",
" predicted_object = predictor.predict(data=x_test_row_string,\n",
" target_model=None,\n",
" target_variant=None,\n",
" inference_id=inference_id)\n",
" predicted_value = float(predicted_object[0][0])\n",
" print('{}: Prediction for \\'{}\\' is {}'.format(inference_id, x_test_row_string, predicted_value))"
]
},
{
"cell_type": "markdown",
"id": "2fe3ba87",
"metadata": {},
"source": [
"## 8. Analyze monitoring executions \n",
"\n",
"You can list the executions of the monitoring schedules and review the status of each of the executions.\n",
"\n",
"Note:\n",
"* With the hourly schedule that we configured in the previous step, the monitor will look for new data capture files in S3 for the previous hour.\n",
"* If no files are found, then the status of that execution will be reported as 'Failed'. This can happen if no traffic was sent to the endpoint in the previous hour.\n",
"* If the execution status is reported as 'CompletedWithViolations', you should see the corresponding violations file in S3 in the path specified in previous steps.\n",
"* If configured in prior steps, you can view metrics in Amazon CloudWatch. For info on this, refer [Data Quality - CloudWatch metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-interpreting-cloudwatch.html) and [Model Quality - CloudWatch metrics](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor-model-quality-cw.html)"
]
},
{
"cell_type": "markdown",
"id": "3f810bad",
"metadata": {},
"source": [
"### A) View monitoring executions "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "d296c444",
"metadata": {},
"outputs": [],
"source": [
"# List monitoring schedules for endpoint\n",
"print('Monitoring schedules for endpoint \\'{}\\':\\n'.format(endpoint_name))\n",
"print(sm_client.list_monitoring_schedules(EndpointName=endpoint_name))"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "101ca0ef",
"metadata": {},
"outputs": [],
"source": [
"## Data Quality Monitor - monitoring executions\n",
"print('Monitoring executions for schedule \\'{}\\':\\n'.format(dq_mon_schedule_name))\n",
"print(sm_client.list_monitoring_executions(MonitoringScheduleName=dq_mon_schedule_name,\n",
" EndpointName=predictor.endpoint_name,\n",
" MonitoringTypeEquals='DataQuality',\n",
" SortBy='CreationTime',\n",
" SortOrder='Descending'))\n"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "a5e1074d",
"metadata": {},
"outputs": [],
"source": [
"## Model Quality Monitor - monitoring executions\n",
"print('Monitoring executions for schedule \\'{}\\':\\n'.format(mq_mon_schedule_name))\n",
"print(sm_client.list_monitoring_executions(MonitoringScheduleName=mq_mon_schedule_name,\n",
" EndpointName=predictor.endpoint_name,\n",
" MonitoringTypeEquals='ModelQuality',\n",
" SortBy='CreationTime',\n",
" SortOrder='Descending'))"
]
},
{
"cell_type": "markdown",
"id": "2e146604",
"metadata": {},
"source": [
"### B) View latest constraint violations "
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "53eaa485",
"metadata": {},
"outputs": [],
"source": [
"## Data Quality Monitor\n",
"dq_constraint_violations = dq_monitor.latest_monitoring_constraint_violations()\n",
"if dq_constraint_violations != None:\n",
" print(dq_constraint_violations.body_dict)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "50432770",
"metadata": {},
"outputs": [],
"source": [
"## Model Quality Monitor\n",
"mq_constraint_violations = mq_monitor.latest_monitoring_constraint_violations()\n",
"if mq_constraint_violations != None:\n",
" print(mq_constraint_violations.body_dict)"
]
},
{
"cell_type": "markdown",
"id": "48b5901f",
"metadata": {},
"source": [
"## 9. Generate alerts \n",
"\n",
"You can generate alerts based on various conditions of the monitoring executions based on your requirements.\n",
"\n",
"One of the common conditions for which alerts would be preferred are when violations are generated. You can automate alerting on this by configuring an [AWS Lambda](https://aws.amazon.com/lambda/) function to trigger when the `constraint_violations.json` file is generated in the specified S3 location by the monitors. In this function, you can write your alerting logic. The following code cell has a sample code for this.\n",
"\n",
"For more information on configuring Amazon S3 as the event source for AWS Lambda, refer [here](https://docs.aws.amazon.com/lambda/latest/dg/with-s3.html).\n",
"\n",
"A common approach for alerting would involve using [Amazon SNS](https://aws.amazon.com/sns) to send out e-mail or text based alerts. In the alerting logic in your Lambda function, you can use the [Boto3 API for SNS](https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sns.html) to invoke SNS from Lambda.\n",
"\n",
"The overall flow would be like this,\n",
"\n",
"Violations file generated in S3 --> Lambda --> SNS --> E-mail/text"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "e107db06",
"metadata": {},
"outputs": [],
"source": [
"!cat scripts/lambda_violations_processor.py"
]
},
{
"cell_type": "markdown",
"id": "7224355a",
"metadata": {},
"source": [
"## 10. Cleanup \n",
"\n",
"As a best practice, you should delete resources and S3 objects when no longer required. This will help you avoid incurring unncessary costs.\n",
"\n",
"This step will cleanup the resources and S3 objects created by this notebook."
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "250eeb51",
"metadata": {},
"outputs": [],
"source": [
"# Delete the monitoring schedules\n",
"dq_monitor.delete_monitoring_schedule()\n",
"mq_monitor.delete_monitoring_schedule()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "0d3c1801",
"metadata": {},
"outputs": [],
"source": [
"# Delete the model, endpoint configuration and endpoint\n",
"predictor.delete_model()\n",
"predictor.delete_endpoint()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "cb38fe6a",
"metadata": {},
"outputs": [],
"source": [
"# Delete data from S3 bucket\n",
"for file in s3_bucket_resource.objects.filter(Prefix='{}/'.format(nb_name)):\n",
" file_key = file.key\n",
" print('Deleting {} ...'.format(file_key))\n",
" s3_resource.Object(s3_bucket_resource.name, file_key).delete()"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "13d49d66",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.13"
}
},
"nbformat": 4,
"nbformat_minor": 5
}