{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Amazon SageMaker Workshop\n", "### _**Monitoring**_\n", "\n", "---\n", "In this part of the workshop we configure SageMaker Model Monitor for the endpoint we deployed.\n", "\n", "\n", "This notebook walks you through some of the main features of Amazon SageMaker Studio. \n", "\n", "* [SageMaker Model Monitor](https://docs.aws.amazon.com/sagemaker/latest/dg/model-monitor.html)\n", " * Monitor the quality of your model\n", " * Set alerts for when model quality deviates\n", " \n", "---\n", "\n", "## Contents\n", "\n", "1. [SageMaker Model Monitor](#SageMaker-Model-Monitor)\n", " - Creating a baseline\n", " - Analyzing subsequent captures for data quality issues\n", " \n", "---\n", "\n", "## Background\n", "\n", "In the previous [Deployment of real-time endpoints](../4-Deployment/Part1/deployment_hosting.ipynb) lab we created a SageMaker endpoint to host our model to predict Mobile customer churn.\n", "\n", "Let's import the libraries for this lab:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from time import strftime, gmtime, sleep\n", "from threading import Thread\n", "\n", "import pandas as pd\n", "\n", "import boto3\n", "\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "from sagemaker.model_monitor import DataCaptureConfig, DatasetFormat, DefaultModelMonitor\n", "from sagemaker.s3 import S3Uploader, S3Downloader" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sess = boto3.Session()\n", "sm = sess.client('sagemaker')\n", "role = sagemaker.get_execution_role()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Load variables for this lab:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store -r bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store -r prefix" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket, prefix" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### - if you _**skipped**_ do the lab `4-Deployment/RealTime` follow instructions:\n", "\n", " - **run this:**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # Uncomment if you have not done Lab 4-Deployment/RealTime\n", "\n", "# from config.solution_lab4 import get_endpoint_from_lab4\n", "# endpoint_name, _ = get_endpoint_from_lab4()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "### - if you _**have done**_ the lab `4-Deployment/RealTime` follow instructions:\n", "\n", " - **run this:**" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# # Uncomment if you've done Lab 4-Deployment/RealTime\n", "\n", "#%store -r endpoint_name\n", "#endpoint_name" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Amazon SageMaker Model Monitor\n", "\n", "Amazon SageMaker Model Monitor lets you monitor and evaluate the data observed by endpoints. It works like this:\n", "1. We need to create a baseline that we can use to compare real-time traffic against. \n", "1. When a baseline is ready, we can set up a schedule to continously evaluate and compare against the baseline.\n", "1. We can send synthetic traffic to trigger alarms.\n", "\n", "**Important**: It takes an hour or more to complete this section because the shortest monitoring polling time is one hour. The following graphic shows how the monitoring results look after running for a few hours and some of the errors triggered by synthetic traffic.\n", "\n", "" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Baselining and continous monitoring\n", "\n", "#### 1. Constraint suggestion with the baseline (training) dataset\n", "\n", "The training dataset that you use to train a model is usually a good baseline dataset. Note that the training dataset data schema and the inference dataset schema must match exactly (for example, they should have the same number and type of features).\n", "\n", "Using our training dataset, let's ask Amazon SageMaker Model Monitor to suggest a set of baseline `constraints` and generate descriptive `statistics` so we can explore the data. For this example, let's upload the training dataset, which we used to train model. We'll use the dataset file with column headers so we have descriptive feature names." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "baseline_prefix = prefix + '/baselining'\n", "baseline_data_prefix = baseline_prefix + '/data'\n", "baseline_results_prefix = baseline_prefix + '/results'\n", "\n", "baseline_data_uri = f's3://{bucket}/{baseline_data_prefix}'\n", "baseline_results_uri = f's3://{bucket}/{baseline_results_prefix}'\n", "print(f'Baseline data uri: {baseline_data_uri}')\n", "print(f'Baseline results uri: {baseline_results_uri}')\n", "baseline_data_path = S3Uploader.upload(\"config/training-dataset-with-header.csv\", baseline_data_uri)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Create a baselining job with the training dataset\n", "\n", "Now that we have the training data ready in S3, let's start a job to `suggest` constraints. To generate the constraints, the convenient helper starts a `ProcessingJob` using a ProcessingJob container provided by Amazon SageMaker." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "my_default_monitor = DefaultModelMonitor(role=role,\n", " instance_count=1,\n", " instance_type='ml.m5.xlarge',\n", " volume_size_in_gb=20,\n", " max_runtime_in_seconds=3600,\n", " )\n", "\n", "baseline_job = my_default_monitor.suggest_baseline(baseline_dataset=baseline_data_path,\n", " dataset_format=DatasetFormat.csv(header=True),\n", " output_s3_uri=baseline_results_uri,\n", " wait=True\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Once the job succeeds, we can explore the `baseline_results_uri` location in s3 to see what files where stored there." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"Found Files:\")\n", "S3Downloader.list(f\"s3://{bucket}/{baseline_results_prefix}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We have a`constraints.json` file that has information about suggested constraints. We also have a `statistics.json` which contains statistical information about the data in the baseline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "baseline_job = my_default_monitor.latest_baselining_job\n", "schema_df = pd.json_normalize(baseline_job.baseline_statistics().body_dict[\"features\"])\n", "schema_df.head(10)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "constraints_df = pd.json_normalize(baseline_job.suggested_constraints().body_dict[\"features\"])\n", "constraints_df.head(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 2. Analyzing subsequent captures for data quality issues\n", "\n", "Now that we've generated a baseline dataset and processed it to get baseline statistics and constraints, let's monitor and analyze the data being sent to the endpoint with monitoring schedules." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Create a schedule\n", "First, let's create a monitoring schedule for the endpoint. The schedule specifies the cadence at which we want to run a new processing job so that we can compare recent data captures to the baseline." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# First, copy over some test scripts to the S3 bucket so that they can be used for pre and post processing\n", "code_prefix = '{}/code'.format(prefix)\n", "pre_processor_script = S3Uploader.upload('preprocessor.py', 's3://{}/{}'.format(bucket,code_prefix))\n", "s3_code_postprocessor_uri = S3Uploader.upload('postprocessor.py', 's3://{}/{}'.format(bucket,code_prefix))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We are ready to create a model monitoring schedule for the Endpoint created before and also the baseline resources (constraints and statistics) which were generated above." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.model_monitor import CronExpressionGenerator\n", "from time import gmtime, strftime\n", "\n", "reports_prefix = '{}/reports'.format(prefix)\n", "s3_report_path = 's3://{}/{}'.format(bucket,reports_prefix)\n", "\n", "mon_schedule_name = 'workshop-xgboost-model-schedule-' + strftime(\"%Y-%m-%d-%H-%M-%S\", gmtime())\n", "my_default_monitor.create_monitoring_schedule(monitor_schedule_name=mon_schedule_name,\n", " endpoint_input=endpoint_name,\n", " #record_preprocessor_script=pre_processor_script,\n", " post_analytics_processor_script=s3_code_postprocessor_uri,\n", " output_s3_uri=s3_report_path,\n", " statistics=my_default_monitor.baseline_statistics(),\n", " constraints=my_default_monitor.suggested_constraints(),\n", " schedule_cron_expression=CronExpressionGenerator.hourly(),\n", " enable_cloudwatch_metrics=True,\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 3. Start generating some artificial traffic\n", "The following block starts a thread to send some traffic to the endpoint. This allows us to continue to send traffic to the endpoint so that we'll have data continually captured for analysis. If there is no traffic, the monitoring jobs will start to fail later.\n", "\n", "To terminate this thread, you need to stop the kernel.\n", "\n", "Obs.: observe the usage of the python `boto3` library for making inference requests in production with the SageMaker Runtime. Before we were using the SageMaker SDK, which simmplifies the development." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "runtime_client = boto3.client('runtime.sagemaker')\n", "\n", "# (just repeating code from above for convenience/ able to run this section independently)\n", "def invoke_endpoint(ep_name, file_name, runtime_client):\n", " with open(file_name, 'r') as f:\n", " for row in f:\n", " payload = row.rstrip('\\n')\n", " response = runtime_client.invoke_endpoint(EndpointName=ep_name,\n", " ContentType='text/csv', \n", " Body=payload,\n", " Accept='text/csv')\n", " response['Body'].read()\n", " sleep(1)\n", " \n", "def invoke_endpoint_forever():\n", " while True:\n", " invoke_endpoint(endpoint_name, 'config/test-dataset-input-cols.csv', runtime_client)\n", " \n", "thread = Thread(target = invoke_endpoint_forever)\n", "thread.start()\n", "\n", "# Note that you need to stop the kernel to stop the invocations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### List executions\n", "Once the schedule is set up, jobs start at the specified intervals. The following code lists the last five executions. If you run this code soon after creating the hourly schedule, you might not see any executions listed. To see executions, you might have to wait until you cross the hour boundary (in UTC). The code includes the logic for waiting." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "mon_executions = my_default_monitor.list_executions()\n", "if len(mon_executions) == 0:\n", " print(\"We created a hourly schedule above and it will kick off executions ON the hour.\\nWe will have to wait till we hit the hour...\")\n", "\n", "while len(mon_executions) == 0:\n", " print(\"Waiting for the 1st execution to happen...\")\n", " sleep(60)\n", " mon_executions = my_default_monitor.list_executions() " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### Evaluate the latest execution and list the generated reports" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "latest_execution = mon_executions[-1]\n", "latest_execution.wait()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(\"Latest execution result: {}\".format(latest_execution.describe()['ExitMessage']))\n", "report_uri = latest_execution.output.destination\n", "\n", "print(\"Found Report Files:\")\n", "S3Downloader.list(report_uri)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "##### List violations\n", "\n", "If there are any violations compared to the baseline, they will be generated here. Let's list the violations." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "violations = my_default_monitor.latest_monitoring_constraint_violations()\n", "pd.set_option('display.max_colwidth', None)\n", "constraints_df = pd.json_normalize(violations.body_dict[\"violations\"])\n", "constraints_df.head(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You can plug in the processing job arn for a single execution of the monitoring into [this notebook](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker_model_monitor/visualization/SageMaker-Model-Monitor-Visualize.ipynb) to see more detailed visualizations of the violations and distribution statistics of the data captue that was processed in that execution\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "latest_execution.describe()['ProcessingJobArn']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "# [You can now go to the final lab 6-Pipelines](../6-Pipelines/pipelines.ipynb)" ] } ], "metadata": { "celltoolbar": "Tags", "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" }, "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." }, "nbformat": 4, "nbformat_minor": 4 }