{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "## Execute computer vision inference on large videos using managed queues, notifications, and automatic scaling with Amazon SageMaker Asynchronous Endpoints" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Table of Contents" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "* Background\n", "* Setup\n", "* Download and trigger pre-trained maskrcnn model on a sample image\n", "* Create model archive and upload to S3\n", "* Create SageMaker model with PyTorch inference container\n", "* Real time hosted endpoint deployment and inference\n", "* Create Asynchronous inference endpoints\n", "* Invoke asynchronous endpoint\n", "* Enable autoscaling\n", "* Cleanup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Background" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "AWS customers are increasingly using computer vision (CV) models on large input payloads that can take a few minutes of processing time. For example, space technology companies work with a stream of high resolution satellite imagery to detect particular objects of interest. Similarly, healthcare companies process high resolution biomedical images or videos like echocardiograms to detect anomalies. Also, media companies scan images and videos uploaded by their customers to ensure they are compliant and without copyright violations. These applications receive a burst of incoming traffic at different times in the day and require near real time processing with completion notifications at a low cost. \n", "\n", "In this notebook, we serve a PyTorch Computer Vision model with SageMaker asynchronous inference endpoints to process a burst of traffic of large input payload videos. We demonstrate the new capabilities of an internal queue with user defined concurrency and completion notifications. We configure autoscaling of instances including scaling down to 0 when traffic subsides and scale back up as the request queue fills up. We use [SageMaker’s pre-built TorchServe container](https://sagemaker.readthedocs.io/en/stable/frameworks/pytorch/using_pytorch.html) with a custom inference script for preprocessing the videos before model invocation. \n", "\n", " 1. Large payload input of a high resolution video segment of 70 MB\n", " 2. Large payload output from a PyTorch pre-trained mask-rcnn model \n", " 3. Large response time from the model of 30 seconds on a gpu instance\n", " 4. Auto-queuing of inference requests with asynchronous inference\n", " 5. Notifications of completed requests via SNS \n", " 6. Auto-scaling of endpoints based on queue length metric with minimum value set to 0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![Workflow](images/Async_Diagram.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Setup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "If you run this notebook in SageMaker Studio, you need to make sure ipywidgets is installed and restart the kernel, so please uncomment the code in the next cell, and run it." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# %%capture\n", "# import IPython\n", "# import sys\n", "\n", "# !{sys.executable} -m pip install ipywidgets\n", "# IPython.Application.instance().kernel.do_shutdown(True) # has to restart kernel so changes are used" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!python -m pip install --upgrade pip --quiet\n", "!pip install -U awscli --quiet\n", "!pip install torch==1.8.0 --quiet \n", "!pip install torchvision==0.9.0 --quiet \n", "!pip install -U sagemaker --quiet" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import torch\n", "import torchvision\n", "import torchvision.models as models\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "from sagemaker.utils import name_from_base\n", "from sagemaker.pytorch import PyTorchModel\n", "import boto3\n", "import datetime\n", "import time\n", "from time import strftime,gmtime\n", "import json\n", "import os\n", "import urllib\n", "import sys\n", "import io\n", "\n", "role = get_execution_role()\n", "boto_session = boto3.session.Session()\n", "sm_session = sagemaker.session.Session()\n", "sm_client = boto_session.client(\"sagemaker\")\n", "sm_runtime = boto_session.client(\"sagemaker-runtime\")\n", "sns_client = boto3.client('sns')\n", "region = boto_session.region_name\n", "bucket = sm_session.default_bucket()\n", "prefix = 'async-inference-maskrcnn'\n", "\n", "print(region)\n", "print(role)\n", "print(bucket)\n", "print(prefix)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(torch.__version__)\n", "print(torchvision.__version__)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Specify your IAM role. Go the AWS IAM console (https://console.aws.amazon.com/iam/home) and add the following policies to your IAM Role:\n", "* SageMakerFullAccessPolicy\n", "* Amazon S3 access: Apply this to get and put objects in your Amazon S3 bucket. Replace `bucket_name` with the name of your Amazon S3 bucket: \n", "\n", "```json\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Action\": [\n", " \"s3:GetObject\",\n", " \"s3:PutObject\",\n", " \"s3:AbortMultipartUpload\",\n", " \"s3:ListBucket\"\n", " ],\n", " \"Effect\": \"Allow\",\n", " \"Resource\": \"arn:aws:s3:::bucket_name/*\"\n", " }\n", " ]\n", "}\n", "```\n", "\n", "* (Optional) Amazon SNS access: Add `sns:Publish` on the topics you define. Apply this if you plan to use Amazon SNS to receive notifications.\n", "\n", "```json\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Action\": [\n", " \"sns:Publish\"\n", " ],\n", " \"Effect\": \"Allow\",\n", " \"Resource\": \"arn:aws:sns:us-east-2:123456789012:MyTopic\"\n", " }\n", " ]\n", "}\n", "```\n", "\n", "* (Optional) KMS decrypt, encrypt if your Amazon S3 bucket is encrypte." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download and trigger maskrcnn model on a sample image" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Using pre-trained maskrcnn resnet 50 model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model = models.detection.maskrcnn_resnet50_fpn(pretrained=True)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import matplotlib.pyplot as plt\n", "import numpy as np\n", "import torchvision.transforms.functional as F\n", "\n", "plt.rcParams[\"savefig.bbox\"] = 'tight'\n", "\n", "def show(imgs):\n", " if not isinstance(imgs, list):\n", " imgs = [imgs]\n", " fix, axs = plt.subplots(ncols=len(imgs), squeeze=False)\n", " for i, img in enumerate(imgs):\n", " img = img.detach()\n", " img = F.to_pil_image(img)\n", " \n", " axs[0, i].imshow(np.asarray(img))\n", " axs[0, i].set(xticklabels=[], yticklabels=[], xticks=[], yticks=[])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from PIL import Image\n", "import torchvision.transforms as transforms\n", "\n", "img = io.BytesIO(open('images/birds.jpg', 'rb').read())\n", "birds_image = Image.open(img).convert('RGB')\n", "birds_image = transforms.ToTensor()(birds_image)\n", "print(type(birds_image))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model.eval()\n", "outputs = model([birds_image])\n", "output = outputs[0]" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Print output\n", "res = []\n", "for output in outputs:\n", " res.append({'boxes':output['boxes'].detach().numpy().tolist(),'labels':output['labels'].detach().numpy().tolist(),'scores':output['scores'].detach().numpy().tolist()})\n", "print(json.dumps(res))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Print original image\n", "from torchvision.utils import make_grid\n", "grid = make_grid([birds_image])\n", "show(grid)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Print original image with model output boxes" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from torchvision.utils import draw_bounding_boxes\n", "from torchvision.io import read_image\n", "score_threshold = .9\n", "birds_with_boxes = [\n", " draw_bounding_boxes(read_image('images/birds.jpg'), boxes=output['boxes'][output['scores'] > score_threshold], width=10)\n", "]\n", "show(birds_with_boxes)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create model archive and upload to S3" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!mkdir model_and_code\n", "torch.save(model, 'model_and_code/model.pth')\n", "!mkdir model_and_code/code\n", "!cp ./code/* model_and_code/code\n", "!tar cvzf model.tar.gz -C model_and_code/ . " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.s3 import S3Uploader\n", "file_key = 'model.tar.gz'\n", "model_artifact = S3Uploader.upload(file_key,'s3://{}/{}/model'.format(bucket, prefix))\n", "print(model_artifact)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create SageMaker model with PyTorch inference container" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.image_uris import retrieve\n", "\n", "deploy_instance_type = 'ml.g4dn.xlarge'\n", "pytorch_inference_image_uri = retrieve('pytorch',\n", " region,\n", " version='1.7.1',\n", " py_version='py3',\n", " instance_type = deploy_instance_type,\n", " accelerator_type=None,\n", " image_scope='inference')\n", "print(pytorch_inference_image_uri)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "container = pytorch_inference_image_uri\n", "model_name = 'sagemaker-maskrcnn-{0}'.format(str(int(time.time())))\n", "print(container)\n", "print(model_name)\n", "\n", "create_model_response = sm_client.create_model(\n", " ModelName = model_name,\n", " ExecutionRoleArn = role,\n", " PrimaryContainer = {\n", " 'Image': container,\n", " 'ModelDataUrl': model_artifact,\n", " 'Environment': {\n", " 'TS_MAX_REQUEST_SIZE': '100000000', #default max request size is 6 Mb for torchserve, need to update it to support the 70 mb input payload\n", " 'TS_MAX_RESPONSE_SIZE': '100000000',\n", " 'TS_DEFAULT_RESPONSE_TIMEOUT': '1000'\n", " }\n", " }, \n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Real time hosted endpoint deployment and inference" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create an endpoint config name. Here we create one based on the date so it we can search endpoints based on creation time." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(model_name)\n", "endpoint_config_name = f\"maskrcnnEndpointConfig-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", "create_endpoint_config_response = sm_client.create_endpoint_config(\n", " EndpointConfigName=endpoint_config_name,\n", " ProductionVariants=[\n", " {\n", " \"VariantName\": \"variant1\",\n", " \"ModelName\": model_name,\n", " \"InstanceType\": \"ml.g4dn.xlarge\",\n", " \"InitialInstanceCount\": 1\n", " }\n", " ]\n", ")\n", "print(f\"Created EndpointConfig: {create_endpoint_config_response['EndpointConfigArn']}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_name = f\"sm-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", "response = sm_client.create_endpoint(\n", " EndpointName=endpoint_name,\n", " EndpointConfigName=endpoint_config_name\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "waiter = boto3.client('sagemaker').get_waiter('endpoint_in_service')\n", "print(\"Waiting for endpoint to create...\")\n", "waiter.wait(EndpointName=endpoint_name)\n", "resp = sm_client.describe_endpoint(EndpointName=endpoint_name)\n", "print(f\"Endpoint Status: {resp['EndpointStatus']}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "ep= endpoint_name\n", "feed_data = open('images/birds.jpg', 'rb')\n", "sm_runtime = boto3.Session().client('sagemaker-runtime')\n", "r = sm_runtime.invoke_endpoint(EndpointName=ep, Body=feed_data)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(r['Body'].read())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Asynchronous inference endpoints" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Unlike real time hosted endpoints, asynchronous endpoints support scaling\n", "down instances to 0 by setting the minimum capacity to 0. With this feature, we can scale\n", "down to 0 instances when there is no traffic and pay only when the payloads arrive. Let's create an asynchronous endpoint to see it in action below -" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket_prefix = \"async-inference-blog\"\n", "resource_name = \"AsyncInferenceDemo-SNS\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create Error and Success SNS topics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sns_client.create_topic(Name=\"Async-Demo-ErrorTopic\")\n", "error_topic= response['TopicArn']\n", "print(error_topic)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sns_client.create_topic(Name=\"Async-Demo-SuccessTopic\")\n", "success_topic = response['TopicArn']\n", "print(success_topic)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "List SNS topics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sns_client.list_topics()\n", "topics = response[\"Topics\"]\n", "print(topics)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Optionally Subscribe to an SNS topic" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Note: Replace with your email id\n", "\n", "# email_id = 'your-email@domain-name.com'\n", "# email_sub_1 = sns_client.subscribe(\n", "# TopicArn=success_topic,\n", "# Protocol='email',\n", "# Endpoint=email_id)\n", "\n", "# email_sub_2 = sns_client.subscribe(\n", "# TopicArn=error_topic,\n", "# Protocol='email',\n", "# Endpoint=email_id)\n", "\n", "#Note: You will need to confirm by clicking on the email you recieve to complete the subscription" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create an endpoint config name. Here we create one based on the date so it we can search endpoints based on creation time." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(model_name)\n", "endpoint_config_name = f\"PyTorchAsyncEndpointConfig-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", "create_endpoint_config_response = sm_client.create_endpoint_config(\n", " EndpointConfigName=endpoint_config_name,\n", " ProductionVariants=[\n", " {\n", " \"VariantName\": \"variant1\",\n", " \"ModelName\": model_name,\n", " \"InstanceType\": \"ml.g4dn.xlarge\",\n", " \"InitialInstanceCount\": 1\n", " }\n", " ],\n", " AsyncInferenceConfig={\n", " \"OutputConfig\": {\n", " \"S3OutputPath\": f\"s3://{bucket}/{bucket_prefix}/output\",\n", " # Optionally specify Amazon SNS topics\n", " \"NotificationConfig\": {\n", " \"SuccessTopic\": success_topic,\n", " \"ErrorTopic\": error_topic,\n", " }\n", " },\n", " \"ClientConfig\": {\n", " \"MaxConcurrentInvocationsPerInstance\": 2\n", " }\n", " }\n", ")\n", "print(f\"Created EndpointConfig: {create_endpoint_config_response['EndpointConfigArn']}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_name = f\"sm-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", "create_endpoint_response = sm_client.create_endpoint(EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)\n", "print(f\"Creating Endpoint: {create_endpoint_response['EndpointArn']}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "waiter = boto3.client('sagemaker').get_waiter('endpoint_in_service')\n", "print(\"Waiting for endpoint to create...\")\n", "waiter.wait(EndpointName=endpoint_name)\n", "resp = sm_client.describe_endpoint(EndpointName=endpoint_name)\n", "print(f\"Endpoint Status: {resp['EndpointStatus']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload input video file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def upload_file(input_location):\n", " prefix = f\"{bucket_prefix}/input\"\n", " return sm_session.upload_data(\n", " input_location, \n", " bucket=sm_session.default_bucket(),\n", " key_prefix=prefix, \n", " extra_args={\"ContentType\": \"video/mp4\"})" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "input_1_location = \"videos/ducks.mp4\"\n", "input_1_s3_location = upload_file(input_1_location)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(input_1_s3_location)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(endpoint_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Invoke asynchronous endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sm_runtime.invoke_endpoint_async(\n", " EndpointName=endpoint_name, \n", " InputLocation=input_1_s3_location)\n", "output_location = response['OutputLocation']\n", "print(f\"OutputLocation: {output_location}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from botocore.exceptions import ClientError\n", "\n", "def get_output(output_location):\n", " output_url = urllib.parse.urlparse(output_location)\n", " bucket = output_url.netloc\n", " key = output_url.path[1:]\n", " while True:\n", " try:\n", " return sm_session.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])\n", " except ClientError as e:\n", " if e.response['Error']['Code'] == 'NoSuchKey':\n", " print(\"waiting for output...\")\n", " time.sleep(2)\n", " continue\n", " raise\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "output = get_output(output_location)\n", "print(f\"Output size in bytes: {((sys.getsizeof(output)))}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Trigger 10 asynchronous requests on a single instance " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "inferences = []\n", "for i in range(1,10):\n", " start = time.time()\n", " response = sm_runtime.invoke_endpoint_async(\n", " EndpointName=endpoint_name, \n", " InputLocation=input_1_s3_location)\n", " output_location = response[\"OutputLocation\"]\n", " inferences += [(input_1_s3_location, output_location)]\n", " time.sleep(0.5)\n", "print(\"\\Async invocations for Pytorch serving default: \\n\")\n", "\n", "for input_file, output_location in inferences:\n", " output = get_output(output_location)\n", " print(f\"Input File: {input_file}, Output location: {output_location}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Enable autoscaling" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "client = boto3.client('application-autoscaling') # Common class representing Application Auto Scaling for SageMaker amongst other services\n", "\n", "resource_id='endpoint/' + endpoint_name + '/variant/' + 'variant1' # This is the format in which application autoscaling references the endpoint\n", "\n", "response = client.register_scalable_target(\n", " ServiceNamespace='sagemaker', \n", " ResourceId=resource_id,\n", " ScalableDimension='sagemaker:variant:DesiredInstanceCount',\n", " MinCapacity=0, \n", " MaxCapacity=5\n", ")\n", "\n", "response = client.put_scaling_policy(\n", " PolicyName='Invocations-ScalingPolicy',\n", " ServiceNamespace='sagemaker', # The namespace of the AWS service that provides the resource. \n", " ResourceId=resource_id, # Endpoint name \n", " ScalableDimension='sagemaker:variant:DesiredInstanceCount', # SageMaker supports only Instance Count\n", " PolicyType='TargetTrackingScaling', # 'StepScaling'|'TargetTrackingScaling'\n", " TargetTrackingScalingPolicyConfiguration={\n", " 'TargetValue': 5.0, # The target value for the metric. \n", " 'CustomizedMetricSpecification': {\n", " 'MetricName': 'ApproximateBacklogSizePerInstance',\n", " 'Namespace': 'AWS/SageMaker',\n", " 'Dimensions': [\n", " {'Name': 'EndpointName', 'Value': endpoint_name }\n", " ],\n", " 'Statistic': 'Average',\n", " },\n", " 'ScaleInCooldown': 120, # The cooldown period helps you prevent your Auto Scaling group from launching or terminating \n", " # additional instances before the effects of previous activities are visible. \n", " # You can configure the length of time based on your instance startup time or other application needs.\n", " # ScaleInCooldown - The amount of time, in seconds, after a scale in activity completes before another scale in activity can start. \n", " 'ScaleOutCooldown': 120 # ScaleOutCooldown - The amount of time, in seconds, after a scale out activity completes before another scale out activity can start.\n", " \n", " # 'DisableScaleIn': True|False - ndicates whether scale in by the target tracking policy is disabled. \n", " # If the value is true , scale in is disabled and the target tracking policy won't remove capacity from the scalable resource.\n", " }\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Trigger 1000 asynchronous invocations with autoscaling from 1 to 5 and then scale down to 0 on completion" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Optionally [delete the SNS topic](https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/sns.html#SNS.Client.delete_topic) to avoid flooding of notifications on 1000 invocations below " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print(endpoint_name)\n", "for i in range(1,1000):\n", " response = sm_runtime.invoke_endpoint_async(\n", " EndpointName=endpoint_name, \n", " InputLocation=input_1_s3_location)\n", "print(\"\\Async invocations for Pytorch serving with auotscaling \\n\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Plot graphs from CloudWatch Metrics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "cw = boto3.Session().client(\"cloudwatch\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import datetime\n", "from datetime import datetime,timedelta\n", "def get_sagemaker_metrics(endpoint_name,\n", " endpoint_config_name,\n", " variant_name,\n", " metric_name,\n", " statistic,\n", " start_time,\n", " end_time):\n", " dimensions = [\n", " {\n", " \"Name\": \"EndpointName\",\n", " \"Value\": endpoint_name\n", " },\n", " {\n", " \"Name\": \"VariantName\",\n", " \"Value\": variant_name\n", " }\n", " ]\n", " if endpoint_config_name is not None:\n", " dimensions.append({\n", " \"Name\": \"EndpointConfigName\",\n", " \"Value\": endpoint_config_name\n", " })\n", " metrics = cw.get_metric_statistics(\n", " Namespace=\"AWS/SageMaker\",\n", " MetricName=metric_name,\n", " StartTime=start_time,\n", " EndTime=end_time,\n", " Period=60,\n", " Statistics=[statistic],\n", " Dimensions=dimensions\n", " )\n", " rename = endpoint_config_name if endpoint_config_name is not None else 'ALL'\n", " return pd.DataFrame(metrics[\"Datapoints\"])\\\n", " .sort_values(\"Timestamp\")\\\n", " .set_index(\"Timestamp\")\\\n", " .drop([\"Unit\"], axis=1)\\\n", " .rename(columns={statistic: rename})\n", "\n", "def plot_endpoint_model_latency_metrics(endpoint_name, endpoint_config_name, variant_name, start_time=None):\n", " start_time = start_time or datetime.now() - timedelta(minutes=60)\n", " end_time = datetime.now()\n", " metric_name = \"ModelLatency\"\n", " statistic = \"Average\"\n", " metrics_variants = get_sagemaker_metrics(\n", " endpoint_name,\n", " endpoint_config_name,\n", " variant_name,\n", " metric_name, \n", " statistic,\n", " start_time,\n", " end_time)\n", " metrics_variants.plot(title=f\"{metric_name}-{statistic}\")\n", " return metrics_variants" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_latency_metrics = plot_endpoint_model_latency_metrics(endpoint_name, None, \"variant1\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Similarly, we plot other Cloud Watch Metrics associated with the Endpoint as shown below" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Cloud watch metrics - Approximate Backlog Size and Approximate Backlog Size per instance \n", "The backlog grows from 0 to 1000 when the burst of traffic is invoked. Then, the endpoint autoscales every 120 seconds upto the max number of instances = 5. The Backlog size per instance changes rapidly during autoscaling. At max number of instances, the queue backlog reduces at about 18 invocation per minute and finally reaches 0. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![title](images/backlog_size_metrics.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The number of invocations successfully processed are about 18 invocations per minute" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![title](images/invocations_metrics.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The model latency for 2 concurrent invocations is approximately 30 seconds " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![title](images/model_latency_metrics.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The instances autoscale down to 0 once the queue size goes down to 0" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "![title](images/instance_count_0.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Cleanup" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "If you enabled auto-scaling for your endpoint, ensure you deregister the endpoint as a scalable target before deleting the endpoint. To do this, please uncomment the cell below and run :" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# response = client.deregister_scalable_target(\n", "# ServiceNamespace='sagemaker',\n", "# ResourceId='resource_id',\n", "# ScalableDimension='sagemaker:variant:DesiredInstanceCount'\n", "# )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Endpoints should be deleted when no longer in use, since (per the [SageMaker pricing page](https://aws.amazon.com/sagemaker/pricing/)) they're billed by time deployed" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm_client.delete_endpoint(EndpointName=endpoint_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "You may also want to delete any other resources you might have created such as SNS topics, S3 objects, etc." ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }