{ "cells": [ { "cell_type": "markdown", "id": "f58e135e", "metadata": {}, "source": [ "# Amazon SageMaker Model Monitoring " ] }, { "cell_type": "markdown", "id": "ad4f3da1", "metadata": {}, "source": [ "ML Monitoring is a critical MLOps capability to reduce risk and manage safe and reliable production machine learning systems at scale. SageMaker contains several integrated services such as [SageMaker Model Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html) and [SageMaker Clarify](https://aws.amazon.com/sagemaker/clarify/) to monitor models for data and model quality, bias, and feature attribution drift.\n", "\n", "Amazon SageMaker Model Monitor continuously monitors the quality of Amazon SageMaker machine learning models in production. With Model Monitor, you can set alerts that notify you when there are deviations in the model quality. Early and proactive detection of these deviations enables you to take corrective actions, such as retraining models, auditing upstream systems, or fixing quality issues without having to monitor models manually or build additional tooling. Model Monitor integrates with SageMaker Clarify to provide pre-built and extendable monitors to get start with monitoring your ML models faster." ] }, { "cell_type": "markdown", "id": "1e71b485", "metadata": {}, "source": [ "In this lab, you will learn how to:\n", " * Capture inference requests, results, and metadata from our pipeline deployed model.\n", " * Schedule a data monitor to monitor for data drift on a regular schedule." ] }, { "cell_type": "markdown", "id": "b0db42c6", "metadata": {}, "source": [ "While each monitor requires task-specific configurations, the standardized monitoring setup workflow you will follow is:\n", "\n", "1. Initialize a monitoring object\n", "2. Configure and run a baseline job to contrastively compare results\n", "3. Schedule continuous monitoring\n", "\n", "The goal of this lab is that you walk through the code and understand how get started with monitoring your machine learning models with SageMaker Model Monitor. " ] }, { "cell_type": "markdown", "id": "eabf1363", "metadata": { "tags": [] }, "source": [ "## Setup" ] }, { "cell_type": "code", "execution_count": null, "id": "306c6cf2", "metadata": {}, "outputs": [], "source": [ "!pip install \"sagemaker>=2.123.0\"" ] }, { "cell_type": "code", "execution_count": null, "id": "d157757f", "metadata": {}, "outputs": [], "source": [ "from datetime import datetime, timedelta\n", "import pandas as pd\n", "import time\n", "import csv\n", "import json\n", "import boto3\n", "import sagemaker\n", "\n", "region = boto3.Session().region_name\n", "sagemaker_session = sagemaker.session.Session()\n", "role = sagemaker.get_execution_role()\n", "default_bucket = sagemaker_session.default_bucket()\n", "\n", "sagemaker_client = sagemaker_session.sagemaker_client\n", "sagemaker_runtime_client = sagemaker_session.sagemaker_runtime_client\n", "\n", "from sagemaker.predictor import Predictor\n", "from sagemaker.serializers import CSVSerializer\n", "\n", "from sagemaker.clarify import (\n", " BiasConfig,\n", " DataConfig,\n", " ModelConfig,\n", " ModelPredictedLabelConfig,\n", " SHAPConfig,\n", ")\n", "\n", "from sagemaker.model_monitor import (\n", " BiasAnalysisConfig,\n", " CronExpressionGenerator,\n", " DataCaptureConfig,\n", " EndpointInput,\n", " ExplainabilityAnalysisConfig,\n", " ModelBiasMonitor,\n", " ModelExplainabilityMonitor,\n", " DefaultModelMonitor,\n", " ModelQualityMonitor,\n", ")\n", "\n", "from sagemaker.model_monitor.dataset_format import DatasetFormat\n", "\n", "from sagemaker.s3 import S3Downloader, S3Uploader" ] }, { "cell_type": "code", "execution_count": null, "id": "40741cbe", "metadata": {}, "outputs": [], "source": [ "print(f\"AWS region: {region}\")\n", "# A different bucket can be used, but make sure the role for this notebook has\n", "# the s3:PutObject permissions. This is the bucket into which the data is captured.\n", "print(f\"S3 Bucket: {default_bucket}\")\n", "\n", "# Endpoint metadata.\n", "# Note: you will use the staging endpoint from the previously lab just as you would in a real scenario to verify your monitoring\n", "# setup before deploying your setup on production endpoints.\n", "endpoint_name = \"workshop-project-staging\"\n", "endpoint_instance_count = 1\n", "endpoint_instance_type = \"ml.m5.large\"\n", "print(f\"Endpoint: {endpoint_name}\")\n", "\n", "prefix = \"sagemaker/xgboost-dm-model-monitoring\"\n", "s3_key = f\"s3://{default_bucket}/{prefix}\"\n", "print(f\"S3 key: {s3_key}\")\n", "\n", "s3_capture_upload_path = f\"{s3_key}/data_capture\"\n", "s3_baseline_results_path = f\"{s3_key}/baselines\"\n", "s3_report_path = f\"{s3_key}/reports\"\n", "\n", "print(f\"Capture path: {s3_capture_upload_path}\")\n", "print(f\"Baselines path: {s3_baseline_results_path}\")\n", "print(f\"Report path: {s3_report_path}\")\n", "\n", "sm_client = boto3.client('sagemaker')\n", "\n", "endpoint_config = sm_client.describe_endpoint(EndpointName = endpoint_name)['EndpointConfigName']\n", "model_name = sm_client.describe_endpoint_config(EndpointConfigName = endpoint_config)['ProductionVariants'][0]['ModelName']\n", "\n", "print(\"Model Name : \", model_name)" ] }, { "cell_type": "markdown", "id": "28a30849", "metadata": { "tags": [] }, "source": [ "## Configure data capture and generate synthetic traffic" ] }, { "cell_type": "markdown", "id": "e5f63184", "metadata": {}, "source": [ "Data quality monitoring automatically monitors machine learning (ML) models in production and notifies you when data quality issues arise. ML models in production have to make predictions on real-life data that is not carefully curated like most training datasets. If the statistical nature of the data that your model receives while in production drifts away from the nature of the baseline data it was trained on, the model begins to lose accuracy in its predictions. Amazon SageMaker Model Monitor uses rules to detect data drift and alerts you when it happens." ] }, { "cell_type": "markdown", "id": "400ea127", "metadata": {}, "source": [ "### Initialize SageMaker Predictor for real-time requests to previously deployed model endpoint" ] }, { "cell_type": "code", "execution_count": null, "id": "bc19dde9", "metadata": {}, "outputs": [], "source": [ "# Create a Predictor Python object for real-time endpoint requests. https://sagemaker.readthedocs.io/en/stable/api/inference/predictors.html\n", "predictor = Predictor(endpoint_name=endpoint_name, serializer=CSVSerializer())" ] }, { "cell_type": "code", "execution_count": null, "id": "863a9094", "metadata": {}, "outputs": [], "source": [ "# SageMaker automatically created a DataCaptureConfig when your model was deployed to an endpoint \n", "# in a prior lab that already had data capture enabled. Below is illustrating how create a custom \n", "# DataCaptureConfig with data capture enabled and update an existing endpoint.\n", "data_capture_config = DataCaptureConfig(\n", " enable_capture=True,\n", " sampling_percentage=100,\n", " destination_s3_uri=s3_capture_upload_path,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "6c684e68", "metadata": {}, "outputs": [], "source": [ "# Now update endpoint with data capture enabled and provide an s3_capture_upload_path.\n", "predictor.update_data_capture_config(data_capture_config)" ] }, { "cell_type": "markdown", "id": "8a245b1d", "metadata": {}, "source": [ "Note: updating your endpoint data config can take 3-5 min. A progress bar will be displayed in the cell above and indicates completion with `---------------!` and the cell execution number. You will see your endpoint status as `Updating` under SageMaker resources > Endpoints while this is in progress and `InService` when your updated endpoint is ready for requests." ] }, { "cell_type": "markdown", "id": "7213bbab", "metadata": {}, "source": [ "### Invoke the deployed model endpoint to generate predictions" ] }, { "cell_type": "markdown", "id": "f2aed558", "metadata": {}, "source": [ "Now send data to this endpoint to get inferences in real time. \n", "\n", "With data capture enabled in the previous step, the request and response payload, along with some additional metadata, is saved to the S3 location specified in `DataCaptureConfig`." ] }, { "cell_type": "code", "execution_count": null, "id": "34abcdd9", "metadata": {}, "outputs": [], "source": [ "# Read in training set for schema and to compute feature attribution baselines.\n", "train_df = pd.read_csv(\"train-headers.csv\")" ] }, { "cell_type": "code", "execution_count": null, "id": "ae1ac67f", "metadata": {}, "outputs": [], "source": [ "# Use test set to create a file without headers and labels to mirror data format at inference time.\n", "test_df = pd.read_csv(\"test.csv\", header = None)\n", "test_df.drop(test_df.columns[0], axis=1, inplace=True)\n", "test_df.sample(180).to_csv('test-samples-no-header.csv', header = False, index = None)" ] }, { "cell_type": "markdown", "id": "ad7152ca", "metadata": {}, "source": [ "Now send a test batch of 180 requests to the model endpoint. These inputs will be captured along with endpoint output predictions and sent to your `s3_capture_upload_path`." ] }, { "cell_type": "code", "execution_count": null, "id": "c3d9d1aa", "metadata": {}, "outputs": [], "source": [ "print(\"Sending test traffic to the endpoint {}. \\nPlease wait...\".format(endpoint_name))\n", "\n", "test_sample_df = pd.read_csv('test-samples-no-header.csv', header = None, index_col = False)\n", "\n", "response = predictor.predict(data=test_sample_df.to_numpy())\n", "\n", "print(\"Done!\")" ] }, { "cell_type": "markdown", "id": "32f04d43", "metadata": {}, "source": [ "### View captured data" ] }, { "cell_type": "markdown", "id": "6e340de7", "metadata": {}, "source": [ "List the data capture files stored in Amazon S3. \n", "\n", "There should be different files from different time periods organized in S3 based on the hour in which the invocation occurred in the format: \n", "\n", "`s3://{destination-bucket-prefix}/{endpoint-name}/{AllTraffic or model-variant-name}/yyyy/mm/dd/hh/filename.jsonl`" ] }, { "cell_type": "code", "execution_count": null, "id": "59eef988", "metadata": {}, "outputs": [], "source": [ "print(\"Waiting 60 seconds for captures to show up\", end=\"\")\n", "\n", "for _ in range(60):\n", " capture_files = sorted(S3Downloader.list(f\"{s3_capture_upload_path}\"))\n", " if capture_files:\n", " break\n", " print(\".\", end=\"\", flush=True)\n", " time.sleep(1)\n", "\n", "print(\"\\nFound Capture Files:\")\n", "print(\"\\n \".join(capture_files[-10:]))" ] }, { "cell_type": "markdown", "id": "a215f8e1", "metadata": {}, "source": [ "Next, view the content of a single capture file, looking at the first few lines in the captured file." ] }, { "cell_type": "code", "execution_count": null, "id": "9b75727b", "metadata": {}, "outputs": [], "source": [ "capture_file = S3Downloader.read_file(capture_files[-1]).split(\"\\n\")[-10:-1]\n", "print(capture_file[-1])" ] }, { "cell_type": "markdown", "id": "0b2fe0ad", "metadata": {}, "source": [ "View a single line is present below in a formatted JSON file." ] }, { "cell_type": "code", "execution_count": null, "id": "473db6e6", "metadata": {}, "outputs": [], "source": [ "print(json.dumps(json.loads(capture_file[-1]), indent=2))" ] }, { "cell_type": "markdown", "id": "9034cc80", "metadata": {}, "source": [ "### Generate synthetic traffic" ] }, { "cell_type": "markdown", "id": "20e699c1", "metadata": {}, "source": [ "In order to review SageMaker's continuous monitoring capabilities, you will start a thread to generate synthetic traffic to send to the deployed model endpoint. \n", "\n", "The `WorkerThread` class will run continuously on the notebook kernel to generate predictions that are captured and sent to S3 until the kernel is restarted or the thread is explicitly terminated. \n", "\n", "See the cell in the `Cleanup` section to terminate the threads.\n", "\n", "This step is necessary because if there is no traffic, the monitoring jobs are marked as `Failed` since there is no data to process." ] }, { "cell_type": "markdown", "id": "dceb4412", "metadata": {}, "source": [ "This cell extends a Python Thread class to be able to able to terminate the thread later on without terminating the notebook kernel." ] }, { "cell_type": "code", "execution_count": null, "id": "5d27799c", "metadata": {}, "outputs": [], "source": [ "import threading\n", "\n", "class WorkerThread(threading.Thread):\n", " def __init__(self, do_run, *args, **kwargs):\n", " super(WorkerThread, self).__init__(*args, **kwargs)\n", " self.__do_run = do_run\n", " self.__terminate_event = threading.Event()\n", "\n", " def terminate(self):\n", " self.__terminate_event.set()\n", "\n", " def run(self):\n", " while not self.__terminate_event.is_set():\n", " self.__do_run(self.__terminate_event)" ] }, { "cell_type": "markdown", "id": "728bf553", "metadata": {}, "source": [ "Now you define a function that your thread will invoke continuously to send test samples to the model endpoint." ] }, { "cell_type": "code", "execution_count": null, "id": "23c332d4", "metadata": {}, "outputs": [], "source": [ "def invoke_endpoint(terminate_event):\n", " with open(\"test-samples-no-header.csv\", \"r\") as f:\n", " i = 0\n", " for row in f:\n", " payload = row.rstrip(\"\\n\")\n", " response = sagemaker_runtime_client.invoke_endpoint(\n", " EndpointName=endpoint_name,\n", " ContentType=\"text/csv\",\n", " Body=payload,\n", " InferenceId=str(i), # unique ID per row\n", " )\n", " i += 1\n", " response[\"Body\"].read()\n", " time.sleep(1)\n", " if terminate_event.is_set():\n", " break\n", "\n", "\n", "# Keep invoking the endpoint with test data\n", "invoke_endpoint_thread = WorkerThread(do_run=invoke_endpoint)\n", "invoke_endpoint_thread.start()" ] }, { "cell_type": "markdown", "id": "83b5a08c", "metadata": { "tags": [] }, "source": [ "## Monitor data quality" ] }, { "cell_type": "markdown", "id": "c8c85562", "metadata": {}, "source": [ "Data quality monitoring automatically monitors machine learning (ML) models in production and notifies you when data quality issues arise. ML models in production have to make predictions on real-life data that is not carefully curated like most training datasets. If the statistical nature of the data that your model receives while in production drifts away from the nature of the baseline data it was trained on, the model begins to lose accuracy in its predictions. Amazon SageMaker Model Monitor uses rules to detect data drift and alerts you when it happens." ] }, { "cell_type": "markdown", "id": "b4bc969d", "metadata": { "tags": [] }, "source": [ "### Configure `DefaultModelMonitor` for detecting data drift." ] }, { "cell_type": "code", "execution_count": null, "id": "16bb4956", "metadata": {}, "outputs": [], "source": [ "data_quality_monitor = DefaultModelMonitor(\n", " role=role,\n", " instance_count=1,\n", " instance_type='ml.m5.xlarge',\n", " volume_size_in_gb=20,\n", " max_runtime_in_seconds=3600,\n", ")" ] }, { "cell_type": "markdown", "id": "1da1ace3", "metadata": {}, "source": [ "\n", "\n", "SageMaker Model Monitor provides a built-in container that provides the ability to suggest the constraints automatically for CSV and flat JSON input. This sagemaker-model-monitor-analyzer container also provides you with a range of model monitoring capabilities, including constraint validation against a baseline, and emitting Amazon CloudWatch metrics. This container is based on Spark and is built with [Deequ \"unit tests for data\"](https://github.com/awslabs/deequ). All column names in your baseline dataset must be compliant with Spark. For column names, use only lowercase characters, and `_` as the only special character.\n", "\n", "The training dataset that you used to trained the model is usually a good baseline dataset. The training dataset data schema and the inference dataset schema should exactly match (the number and order of the features). Note that the prediction/output columns are assumed to be the first columns in the training dataset. From the training dataset, you can ask SageMaker to suggest a set of baseline constraints and generate descriptive statistics to explore the data." ] }, { "cell_type": "markdown", "id": "54c5e743", "metadata": {}, "source": [ "### Run data quality baseline job" ] }, { "cell_type": "markdown", "id": "bb2d8619", "metadata": {}, "source": [ "Start a data quality baseline processing job with `DefaultModelMonitor.suggest_baseline(..)` using the Amazon SageMaker Python SDK. This uses an Amazon SageMaker Model Monitor prebuilt container that generates baseline statistics and suggests baseline constraints for the dataset and writes them to the `output_s3_uri` location that you specify.\n", "\n", "The baseline calculations of statistics and constraints are needed as a standard against which data drift and other data quality issues can be detected." ] }, { "cell_type": "markdown", "id": "bcbd7365", "metadata": {}, "source": [ "Note: the baseline job will take about 5 min. If the log suppression does not work below, you can clear the cell output with `Edit > Clear Output`." ] }, { "cell_type": "code", "execution_count": null, "id": "820ea60b", "metadata": {}, "outputs": [], "source": [ "data_quality_baseline_job_name = f\"DataQualityBaselineJob-{datetime.utcnow():%Y-%m-%d-%H%M}\"\n", "\n", "data_quality_baseline_job = data_quality_monitor.suggest_baseline(\n", " job_name=data_quality_baseline_job_name,\n", " baseline_dataset=\"train-headers.csv\",\n", " dataset_format=DatasetFormat.csv(header=True),\n", ")\n", "\n", "data_quality_baseline_job.wait(logs=False)" ] }, { "cell_type": "markdown", "id": "342f7939", "metadata": {}, "source": [ "Amazon SageMaker Model Monitor prebuilt container computes per column/feature statistics. The statistics are calculated for the baseline dataset and also for the current dataset that is being analyzed. The cell below returns a table of computed baseline feature statistics." ] }, { "cell_type": "code", "execution_count": null, "id": "929e194d", "metadata": {}, "outputs": [], "source": [ "latest_data_quality_baseline_job = data_quality_monitor.latest_baselining_job\n", "schema_df = pd.json_normalize(latest_data_quality_baseline_job.baseline_statistics().body_dict[\"features\"])\n", "schema_df.head(10)" ] }, { "cell_type": "markdown", "id": "7446d60b", "metadata": {}, "source": [ "SageMaker Model Monitor will also suggest constraints per feature using the baseline statistics. View them using the cell below:" ] }, { "cell_type": "code", "execution_count": null, "id": "4f4b471e", "metadata": {}, "outputs": [], "source": [ "constraints_df = pd.json_normalize(latest_data_quality_baseline_job.suggested_constraints().body_dict[\"features\"])\n", "constraints_df.head(10)" ] }, { "cell_type": "markdown", "id": "2789780c", "metadata": {}, "source": [ "### Schedule continuous data quality monitoring" ] }, { "cell_type": "markdown", "id": "5bce97e1", "metadata": {}, "source": [ "You can create a data monitoring schedule for the endpoint created earlier. \n", "\n", "Use the baseline resources (constraints and statistics) to compare against the real-time traffic hourly." ] }, { "cell_type": "code", "execution_count": null, "id": "3b5a2008", "metadata": {}, "outputs": [], "source": [ "## Create a data quality monitoring schedule name.\n", "data_quality_monitor_schedule_name = (\n", " f\"xgboost-dm-data-monitoring-schedule-{datetime.utcnow():%Y-%m-%d-%H%M}\"\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "3b4ad9d0", "metadata": {}, "outputs": [], "source": [ "# Create an EndpointInput\n", "endpointInput = EndpointInput(\n", " endpoint_name=predictor.endpoint_name,\n", " destination=\"/opt/ml/processing/input_data\",\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "62271c49", "metadata": {}, "outputs": [], "source": [ "# Specify where to write the data quality monitoring results report to.\n", "data_quality_baseline_job_result_uri = f\"{s3_baseline_results_path}/data_quality\"\n", "\n", "response = data_quality_monitor.create_monitoring_schedule(\n", " monitor_schedule_name=data_quality_monitor_schedule_name,\n", " endpoint_input=endpointInput,\n", " output_s3_uri=data_quality_baseline_job_result_uri,\n", " constraints=latest_data_quality_baseline_job.suggested_constraints(),\n", " # Create the monitoring schedule to execute every hour. \n", " schedule_cron_expression=CronExpressionGenerator.hourly(),\n", " enable_cloudwatch_metrics=True,\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "f5ea0f29", "metadata": {}, "outputs": [], "source": [ "# You will see the monitoring schedule in the 'Scheduled' status\n", "data_quality_monitor.describe_schedule()" ] }, { "cell_type": "code", "execution_count": null, "id": "f8829393", "metadata": {}, "outputs": [], "source": [ "# Check default model monitor created.\n", "predictor.list_monitors()" ] }, { "cell_type": "code", "execution_count": null, "id": "a5088dae", "metadata": {}, "outputs": [], "source": [ "# Initially there will be no executions since the first execution happens at the top of the hour\n", "# Note that it is common for the execution to launch upto 20 min after the hour.\n", "executions = data_quality_monitor.list_executions()\n", "executions[:5]" ] }, { "cell_type": "markdown", "id": "bd2f520e", "metadata": { "tags": [] }, "source": [ "## Cleanup" ] }, { "cell_type": "markdown", "id": "53364015", "metadata": {}, "source": [ "Well done! If you are finished with the notebook, run the following cells to terminate lab resources and prevent continued charges." ] }, { "cell_type": "markdown", "id": "5156a0a3", "metadata": {}, "source": [ "First, stop the worker threads." ] }, { "cell_type": "code", "execution_count": null, "id": "0d2461a0", "metadata": {}, "outputs": [], "source": [ "invoke_endpoint_thread.terminate()" ] }, { "cell_type": "markdown", "id": "b7b3c656", "metadata": {}, "source": [ "Finally, stop and then delete all monitors scheduled to the endpoint." ] }, { "cell_type": "markdown", "id": "f8ff7edd", "metadata": {}, "source": [ "If the following cell throws an error similar to `ClientError: An error occurred (ValidationException) when calling the DeleteMonitoringSchedule operation: can't delete schedule as it has in-progress executions`, wait a few minutes and run this cell again. You can't delete a monitor if a monitoring job is executing, once it is done, you can delete the monitoring schedule." ] }, { "cell_type": "code", "execution_count": null, "id": "4fe2eb9c", "metadata": { "tags": [] }, "outputs": [], "source": [ "model_monitors = predictor.list_monitors()\n", "\n", "for monitor in model_monitors:\n", " monitor.stop_monitoring_schedule()\n", " monitor.delete_monitoring_schedule()" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-2:429704687514:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 5 }