{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Amazon SageMaker Asynchronous Endpoints - Computer Vision\n", "\n", "---\n", "\n", "본 예제 코드는 [AWS AIML Blog](https://aws.amazon.com/ko/blogs/machine-learning/run-computer-vision-inference-on-large-videos-with-amazon-sagemaker-asynchronous-endpoints/)에 공개된 예제 코드를 기반으로 자체적으로 아래 개선점들을 반영하였습니다.\n", "\n", "- OpenCV 프로세싱 오류 수정\n", "- 로컬 환경 디버깅 예시 및 로컬 모드 예시 추가\n", "- 추론 결괏값 처리 및 시각화 로직 추가\n", "- 마이너 버그 개선\n", "\n", "### References\n", "\n", "- AWS Blog: https://aws.amazon.com/ko/blogs/machine-learning/run-computer-vision-inference-on-large-videos-with-amazon-sagemaker-asynchronous-endpoints/\n", "- GitHub: https://github.com/aws-samples/amazon-sagemaker-asynchronous-inference-computer-vision" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Background" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "처리 시간이 수 분 이상 걸릴 수 있는 대규모 입력 페이로드를 사용하는 유즈케이스가 증가하고 있습니다.\n", "예를 들어, 우주 기술 회사는 고해상도 위성 이미지 스트림을 사용하여 관심 있는 특정 물체를 감지합니다. 유사하게, 의료 회사는 이상을 감지하기 위해 심초음파와 같은 고해상도 생체의학 이미지 또는 비디오를 처리합니다. 또한 미디어 회사는 고객이 업로드한 이미지와 비디오를 스캔하여 규정을 준수하고 저작권 위반이 없는지 확인합니다. 이러한 애플리케이션들은 하루 중 다른 시간에 들어오는 트래픽 버스트를 수신하고 저렴한 비용으로 완료 알림과 함께 실시간에 가까운 처리가 필요합니다.\n", "\n", "이 노트북에서는 대규모 입력 페이로드 비디오의 트래픽 버스트를 처리하는 SageMaker 비동기(asynchronous) 추론 엔드포인트를 배포합니다. 최소 1개 이상의 인스턴스가 필요한 실시간 호스팅 엔드포인트와 달리 비동기 엔드포인트는 인스턴스를 0으로 축소할 수 있습니다. 이 기능을 사용하면 트래픽이 없을 때 인스턴스를 0으로 축소하기에 비용을 절감할 수 있습니다.\n", "\n", "\n", "### Difference between Real-time endpoint and Asynchronous endpoint\n", "비동기식 엔드포인트를 생성하는 프로세스는 실시간 엔드포인트와 동일합니다. (Model 생성 - EndpointConfig 생성 - Endpoint 생성)\n", "\n", "실시간 엔드포인트와의 가장 큰 차이점은 요청 페이로드(request payload)를 인라인으로 전달하는 대신 페이로드를 Amazon S3에 업로드하고 Amazon S3 URI를 요청의 일부로 전달합니다. 요청을 받으면 SageMaker는 처리 후 결과가 저장될 출력 경로가 포함된 토큰을 제공합니다. 내부적으로 SageMaker는 이러한 요청이 있는 대기열을 자동으로 유지 관리하고 처리합니다. 또한, 엔드포인트를 생성하는 동안 선택적으로 Amazon SNS topic을 지정하여 성공 또는 오류 알림을 받을 수 있습니다. 추론 요청이 성공적으로 처리되었다는 알림을 받으면 Amazon S3를 통해 결괏값을 확인할 수 있습니다." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "## 0. Pre-requisites\n", "---\n", "\n", "### Setup" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%load_ext autoreload\n", "%autoreload 2" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import torch\n", "import torchvision\n", "import torchvision.models as models\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "from sagemaker.utils import name_from_base\n", "from sagemaker.pytorch import PyTorchModel\n", "import boto3\n", "import datetime\n", "import time\n", "from time import strftime,gmtime\n", "import json\n", "import os\n", "import urllib\n", "import sys\n", "import io\n", "import torchvision.transforms as transforms\n", "\n", "role = get_execution_role()\n", "boto_session = boto3.session.Session()\n", "sm_session = sagemaker.session.Session()\n", "sm_client = boto_session.client(\"sagemaker\")\n", "sm_runtime = boto_session.client(\"sagemaker-runtime\")\n", "sns_client = boto3.client('sns')\n", "region = boto_session.region_name\n", "bucket = sm_session.default_bucket()\n", "prefix = 'async-inference-maskrcnn'\n", "\n", "print(region)\n", "print(role)\n", "print(bucket)\n", "print(prefix)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### IAM Role " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "AWS IAM 콘솔(https://console.aws.amazon.com/iam/home)로 이동하여 IAM 역할(role)에 다음 정책(policy)을 추가합니다.\n", "\n", "* SageMakerFullAccessPolicy\n", "* Amazon S3 access: GetObject/PutObject 권한을 적용합니다. `bucket_name`을 여러분의 Amazon S3 버킷의 이름으로 변경하세요.\n", "```json\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Action\": [\n", " \"s3:GetObject\",\n", " \"s3:PutObject\",\n", " \"s3:AbortMultipartUpload\",\n", " \"s3:ListBucket\"\n", " ],\n", " \"Effect\": \"Allow\",\n", " \"Resource\": \"arn:aws:s3:::bucket_name/*\"\n", " }\n", " ]\n", "}\n", "```\n", "\n", "* (Optional) Amazon SNS access: 정의한 토픽에 `sns:Publish`를 추가하세요. Amazon SNS를 사용하여 알림을 수신하려는 경우 적용합니다.\n", "\n", "```json\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Action\": [\n", " \"sns:Publish\"\n", " ],\n", " \"Effect\": \"Allow\",\n", " \"Resource\": \"arn:aws:sns:us-east-2:123456789012:MyTopic\"\n", " }\n", " ]\n", "}\n", "```\n", "\n", "* (Optional) KMS decrypt, encrypt if your Amazon S3 bucket is encrypte." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Inference Script\n", "\n", "SageMaker 추론 스크립트는 SageMaker 상에서 MXNet에 최적화된 추론 서버인 MMS(Multi Model Server)나 PyTorch에 최적화된 추론 서버인 torchserve를 쉽고 편하게 배포할 수 있는 high-level 툴킷인 SageMaker inference toolkit의 인터페이스를 사용하고 있으며, 여러분께서는 인터페이스에 정의된 핸들러(handler) 함수들만 구현하시면 됩니다. MXNet 및 PyTorch용 엔트리포인트(entrypoint) 인터페이스는 아래 두 가지 옵션 중 하나를 선택하면 되며, 본 예제에서는 Option 2.의 사용 예시를 보여줍니다.\n", "\n", "\n", "#### Option 1.\n", "- `model_fn(model_dir)`: 딥러닝 네트워크 아키텍처를 정의하고 S3의 model_dir에 저장된 모델 아티팩트를 로드합니다.\n", "- `input_fn(request_body, content_type)`: 입력 데이터를 전처리합니다. (예: request_body로 전송된 bytearray 배열을 PIL.Image로 변환 수 cropping, resizing, normalization등의 전처리 수행). content_type은 입력 데이터 종류에 따라 다양하게 처리 가능합니다. (예: application/x-npy, application/json, application/csv 등)\n", "- `predict_fn(input_object, model)`: input_fn을 통해 들어온 데이터에 대해 추론을 수행합니다.\n", "- `output_fn(prediction, accept_type)`: predict_fn에서 받은 추론 결과를 추가 변환을 거쳐 프론트 엔드로 전송합니다.\n", "\n", "#### Option 2.\n", "- `model_fn(model_dir)`: 딥러닝 네트워크 아키텍처를 정의하고 S3의 model_dir에 저장된 모델 아티팩트를 로드합니다.\n", "- `transform_fn(model, request_body, content_type, accept_type)`: input_fn(), predict_fn(), output_fn()을 transform_fn()으로 통합할 수 있습니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile src/inference.py\n", "\n", "import cv2\n", "import json\n", "import torch\n", "import torchvision.transforms as transforms\n", "from six import BytesIO\n", "import io\n", "import numpy as np\n", "import tempfile\n", "\n", "device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')\n", "\n", "def video2frame(file_path, frame_width, frame_height, interval):\n", " \"\"\"\n", " Extract frame from video by interval\n", " :param video_src_path: video src path\n", " :param video: video file name\n", " :param frame_width: frame width\n", " :param frame_height: frame height\n", " :param interval: interval for frame to extract\n", " :return: list of numpy.ndarray \n", " \"\"\"\n", " video_frames = []\n", " cap = cv2.VideoCapture(file_path)\n", " frame_index = 0\n", " frame_count = 0\n", " if cap.isOpened():\n", " success = True\n", " else:\n", " success = False\n", " print(\"Read failed!\")\n", "\n", " while success:\n", " success, frame = cap.read()\n", " if frame_index % interval == 0:\n", " print(\"---> Reading the %d frame:\" % frame_index, success)\n", " frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)\n", " resize_frame = cv2.resize(\n", " frame, (frame_width, frame_height), interpolation=cv2.INTER_AREA\n", " )\n", " video_frames.append(resize_frame)\n", " frame_count += 1\n", "\n", " frame_index += 1\n", "\n", " cap.release()\n", " print(f'Total frames={frame_index}, Number of extracted frames={frame_count}')\n", " return video_frames\n", "\n", "\n", "def model_fn(model_dir):\n", " '''\n", " Loads the model into memory from storage and return the model.\n", " ''' \n", " model = torch.load(model_dir + '/model.pth', map_location=torch.device(device))\n", " model = model.eval()\n", " return model\n", "\n", "\n", "def input_fn(request_body, request_content_type=None):\n", " frame_width = 256\n", " frame_height = 256\n", " interval = 30\n", " print(\"content_type=\")\n", " print(request_content_type)\n", " \n", " f = io.BytesIO(request_body)\n", " with tempfile.NamedTemporaryFile(delete=False) as tfile:\n", " tfile.write(f.read())\n", " filename = tfile.name\n", " \n", " video_frames = video2frame(filename, frame_width, frame_height, interval) \n", " return video_frames\n", "\n", "\n", "def predict_fn(video_frames, model):\n", " transform = transforms.Compose([\n", " transforms.Lambda(lambda video_frames: torch.stack([transforms.ToTensor()(frame) for frame in video_frames])) # returns a 4D tensor\n", " ])\n", " image_tensors = transform(video_frames).to(device)\n", " \n", " with torch.no_grad():\n", " output = model(image_tensors)\n", " return output\n", "\n", "\n", "def output_fn(output_batch, accept='application/json'):\n", " res = []\n", " \n", " print(f'output list length={len(output_batch)}')\n", " for output in output_batch:\n", " boxes = output['boxes'].detach().cpu().numpy()\n", " labels = output['labels'].detach().cpu().numpy()\n", " scores = output['scores'].detach().cpu().numpy()\n", " masks = output['masks'].detach().cpu().numpy()\n", " masks = np.squeeze(masks.transpose(2,3,0,1)) # 4D(batch x 1 height x width) to 3D(height x width x batch)\n", " \n", " res.append({\n", " 'boxes': boxes.tolist(),\n", " 'labels': labels.tolist(),\n", " 'scores': scores.tolist(),\n", " 'masks': masks.tolist() \n", " })\n", " \n", " return json.dumps(res)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualization\n", "\n", "Instance Segmentation 결괏값을 시각화합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%writefile src/visualize.py\n", "\n", "import numpy as np\n", "import colorsys\n", "from skimage.measure import find_contours\n", "import random\n", "import matplotlib.pyplot as plt\n", "from matplotlib import patches, lines\n", "from matplotlib.patches import Polygon\n", "\n", "def get_label_map(label_file):\n", " label_map = {}\n", " labels = open(label_file, 'r')\n", " \n", " for line in labels:\n", " line = line.rstrip(\"\\n\")\n", " ids = line.split(',')\n", " label_map[int(ids[0])] = ids[2] \n", " \n", " return label_map\n", "\n", "\n", "def random_colors(N, bright=False):\n", " \"\"\"\n", " Generate random colors.\n", " To get visually distinct colors, generate them in HSV space then\n", " convert to RGB.\n", " \"\"\" \n", " brightness = 1.0 if bright else 0.7\n", " hsv = [(i / N, 1, brightness) for i in range(N)]\n", " colors = list(map(lambda c: colorsys.hsv_to_rgb(*c), hsv))\n", " random.shuffle(colors)\n", " return colors\n", "\n", "\n", "def apply_mask(image, mask, color, alpha=0.3):\n", " \"\"\"Apply the given mask to the image.\n", " \"\"\"\n", " for c in range(3):\n", " image[:, :, c] = np.where(mask == 1,\n", " image[:, :, c] *\n", " (1 - alpha) + alpha * color[c] * 255,\n", " image[:, :, c])\n", " return image\n", "\n", "\n", "def display_instances(image, boxes, masks, class_ids, class_names,\n", " scores=None, title=\"\", \n", " score_thres=0.5, mask_thres=0.5,\n", " figsize=(10, 10), ax=None,\n", " show_mask=True, show_bbox=True,\n", " colors=None, framework='pytorch'):\n", " \"\"\"\n", " boxes: [num_instance, (x1, y1, x2, y2, class_id)] in image coordinates.\n", " masks: [height, width, num_instances]\n", " class_ids: [num_instances]\n", " class_names: list of class names of the dataset\n", " scores: (optional) confidence scores for each box\n", " title: (optional) Figure title\n", " score_thres: To return only objects whose score is greater than to a certain value in the detected result.\n", " mask_thres: Threshold for binarizing the mask image\n", " figsize: (optional) the size of the image\n", " show_mask, show_bbox: To show masks and bounding boxes or not \n", " colors: (optional) An array or colors to use with each object\n", " framework: pytorch/mxnet\n", " \"\"\"\n", " \n", " if framework == 'mxnet':\n", " boxes = boxes.asnumpy()\n", " masks = masks.asnumpy()\n", " scores = scores.asnumpy() \n", " else:\n", " boxes = np.array(boxes)\n", " masks = np.array(masks) \n", " scores = np.array(scores) \n", " \n", " # Get only results that are above the threshold. Default threshold is 0.5. \n", " scores = scores[scores > score_thres]\n", " # Number of instances\n", " N = len(scores)\n", "\n", " if not N:\n", " print(\"\\n*** No instances to display *** \\n\")\n", "\n", " # If no axis is passed, create one and automatically call show()\n", " auto_show = False\n", " if not ax:\n", " _, ax = plt.subplots(1, figsize=figsize)\n", " auto_show = True\n", "\n", " # Generate random colors\n", " colors = colors or random_colors(N)\n", "\n", " # Show area outside image boundaries.\n", " height, width = image.shape[:2]\n", " ax.set_ylim(height + 10, -10)\n", " ax.set_xlim(-10, width + 10)\n", " ax.axis('off')\n", " ax.set_title(title)\n", " masked_image = image.astype(np.uint32).copy()\n", " \n", " for i in range(N):\n", " color = colors[i]\n", "\n", " # Bounding box\n", " if not np.any(boxes[i]):\n", " # Skip this instance. Has no bbox. Likely lost in image cropping.\n", " continue\n", " x1, y1, x2, y2 = boxes[i]\n", " \n", " if show_bbox:\n", " p = patches.Rectangle((x1, y1), x2 - x1, y2 - y1, linewidth=2,\n", " alpha=0.7, linestyle=\"dashed\",\n", " edgecolor=color, facecolor='none')\n", " ax.add_patch(p)\n", "\n", " # Label\n", " class_id = class_ids[i]\n", " score = scores[i] if scores is not None else None\n", " #predicted_class = class_info[int(cls_pred)]\n", " label = class_names[int(class_id)]\n", " caption = \"{} {:.3f}\".format(label, score) if score else label\n", " ax.text(x1, y1, caption, color='w', verticalalignment='top',\n", " size=12, bbox={'color': color, 'pad': 0}) \n", "\n", " # Mask\n", " mask = (masks[:, :, i] > mask_thres) * 1\n", " if show_mask:\n", " masked_image = apply_mask(masked_image, mask, color)\n", "\n", " # Mask Polygon\n", " # Pad to ensure proper polygons for masks that touch image edges.\n", " padded_mask = np.zeros(\n", " (mask.shape[0] + 2, mask.shape[1] + 2), dtype=np.uint8)\n", "\n", " padded_mask[1:-1, 1:-1] = mask\n", " contours = find_contours(padded_mask, 0.5)\n", " for verts in contours:\n", " # Subtract the padding and flip (y, x) to (x, y)\n", " verts = np.fliplr(verts) - 1\n", " p = Polygon(verts, facecolor=\"none\", edgecolor=color)\n", " ax.add_patch(p)\n", " ax.imshow(masked_image.astype(np.uint8))\n", " if auto_show:\n", " plt.show()\n", " \n", " #return masked_image" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "## 1. Local Endpoint Inference\n", "---\n", "\n", "먼저, 로컬 개발 환경에서 추론 결과를 확인합니다. 로컬에서 충분히 디버깅 후 호스팅 엔드포인트로 배포해야 합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from src.visualize import get_label_map, display_instances\n", "from src.inference import model_fn, input_fn, predict_fn, output_fn" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "사전 훈련된 ResNet-50 기반 Mask-RCNN 모델을 사용합니다. 여러분의 모델을 준비하셔도 됩니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model = models.detection.maskrcnn_resnet50_fpn(pretrained=True)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Download Sample Movie Clip\n", "GluonCV의 샘플 무비클립을 다운로드합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "video_path = 'https://raw.githubusercontent.com/dmlc/web-data/master/gluoncv/tracking/Coke.mp4'\n", "!rm -rf videos & mkdir videos \n", "video_local_path = 'videos/Coke.mp4'\n", "!wget $video_path -O $video_local_path\n", "\n", "with open(video_local_path, mode='rb') as file:\n", " request_body = bytearray(file.read())" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Save Model\n", "모델을 로컬 폴더로 저장합니다.\n", "\n", "**[참고]** 사전 훈련된 모델을 fine-tuning 없이 사용하는 경우 0바이트의 `model.tar.gz`로 아카이빙하는 것도 가능합니다. 물론 이 경우는 인터넷 연결이 활성화되어야 합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_path = 'model'\n", "!rm -rf $model_path & mkdir $model_path\n", "torch.save(model, f'{model_path}/model.pth') " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Local Inference without Endpoint\n", "SageMaker 호스팅 인스턴스로 배포하기 전, SageMaker 추론에서 사용하는 인터페이스로 충분히 테스트를 수행합니다. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model = model_fn(model_path)\n", "video_frames = input_fn(request_body)\n", "preds = predict_fn(video_frames, model)\n", "output = output_fn(preds)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Visualization\n", "자체 구현한 시각화 함수로 Mask R-CNN의 Instance Segmentation 결괏값을 출력합니다. 예제 코드에서는 첫번째 프레임 결과만 확인하지만 모든 프레임에 대한 결괏값을 확인할 수도 있습니다. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "output_json = json.loads(output)\n", "label_map = get_label_map('src/coco_labels.txt')\n", "\n", "display_instances(\n", " image=video_frames[0], \n", " boxes=output_json[0]['boxes'], \n", " masks=output_json[0]['masks'],\n", " class_ids=output_json[0]['labels'], \n", " class_names=label_map,\n", " scores=output_json[0]['scores']\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create model archive and upload to S3\n", "\n", "SageMaker 호스팅 인스턴스 배포를 위해 모델 파라메터와 소스 코드를 model.tar.gz로 아카이빙하여 S3로 복사합니다.\n", "PyTorch 1.2 버전부터는 아래 규칙으로 아카이빙해야 합니다.\n", "\n", "```\n", "model.tar.gz/\n", "|- model.pth\n", "|- code/\n", " |- inference.py\n", " |- requirements.txt # only for versions 1.3.1 and higher\n", "```\n", "\n", "참고로, SageMaker Python SDK를 사용하는 경우, 인자값에 소스 코드 디렉토리와 소스 코드를 명시해 주면 자동으로 모델 파라메터와 소스 코드를 아카이빙해 줍니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "model_artifact_path = 'model_and_code'\n", "\n", "!rm -rf $model_artifact_path\n", "!mkdir $model_artifact_path\n", "!mkdir $model_artifact_path/code\n", "torch.save(model, f'{model_artifact_path}/model.pth')\n", "!cp ./src/* $model_artifact_path/code\n", "!tar cvzf model.tar.gz -C $model_artifact_path/ . " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.s3 import S3Uploader\n", "file_key = 'model.tar.gz'\n", "model_artifact = S3Uploader.upload(file_key,'s3://{}/{}/model'.format(bucket, prefix))\n", "print(model_artifact)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "## 2. (Optional - SageMaker Only) Local Mode \n", "---\n", "\n", "충분한 검증 및 테스트 없이 훈련된 모델을 곧바로 실제 운영 환경에 배포하기에는 많은 위험 요소들이 있습니다. 따라서, 로컬 모드를 사용하여 실제 운영 환경에 배포하기 위한 추론 인스턴스를 시작하기 전에 노트북 인스턴스의 로컬 환경에서 모델을 배포하는 것을 권장합니다. 이를 로컬 모드 엔드포인트(Local Mode Endpoint)라고 합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_model_path = f'file://{os.getcwd()}/model.tar.gz'\n", "\n", "model = PyTorchModel(model_data=local_model_path,\n", " role=role,\n", " entry_point='inference.py', \n", " source_dir='src',\n", " framework_version='1.7.1',\n", " py_version='py3')\n", "\n", "predictor = model.deploy(\n", " initial_instance_count=1,\n", " instance_type='local'\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create SageMaker model with PyTorch inference container" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "local_sm_runtime = sagemaker.local.LocalSagemakerRuntimeClient()\n", "local_endpoint_name = model.endpoint_name\n", "\n", "feed_data = open('images/birds.jpg', 'rb')\n", "response = local_sm_runtime.invoke_endpoint(\n", " EndpointName=local_endpoint_name, \n", " Body=feed_data\n", ")\n", "output_json = json.loads(response['Body'].read())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import numpy as np\n", "from PIL import Image\n", "img = Image.open('images/birds.jpg').resize((256, 256))\n", "\n", "display_instances(\n", " image=np.array(img), \n", " boxes=output_json[0]['boxes'], \n", " masks=output_json[0]['masks'],\n", " class_ids=output_json[0]['labels'], \n", " class_names=label_map,\n", " scores=output_json[0]['scores']\n", ")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "predictor.delete_endpoint()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "## 3. Create Asynchronous inference endpoint\n", "---\n", "\n", "비동기 엔드포인트를 생성합니다. 생성 방법은 기존 실시간 호스팅 엔드포인트와 거의 동일하며, EndpointConfig 생성 시 AsyncInferenceConfig만 명시해 주시면 됩니다." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### (Optional) Create Error and Success SNS topics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sns_client.create_topic(Name=\"Async-Demo-ErrorTopic\")\n", "error_topic= response['TopicArn']\n", "print(error_topic)\n", "\n", "response = sns_client.create_topic(Name=\"Async-Demo-SuccessTopic\")\n", "success_topic = response['TopicArn']\n", "print(success_topic)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sns_client.list_topics()\n", "topics = response[\"Topics\"]\n", "print(topics)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Note: Replace with your email id\n", "\n", "email_id = 'YOUR-EMAIL'\n", "email_sub_1 = sns_client.subscribe(\n", " TopicArn=success_topic,\n", " Protocol='email',\n", " Endpoint=email_id)\n", "\n", "email_sub_2 = sns_client.subscribe(\n", " TopicArn=error_topic,\n", " Protocol='email',\n", " Endpoint=email_id)\n", "\n", "#Note: You will need to confirm by clicking on the email you recieve to complete the subscription" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Model" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.image_uris import retrieve\n", "\n", "deploy_instance_type = 'ml.g4dn.xlarge'\n", "pytorch_inference_image_uri = retrieve(\n", " 'pytorch',\n", " region,\n", " version='1.7.1',\n", " py_version='py3',\n", " instance_type = deploy_instance_type,\n", " accelerator_type=None,\n", " image_scope='inference'\n", ")\n", "\n", "container = pytorch_inference_image_uri\n", "model_name = 'sagemaker-maskrcnn-{0}'.format(str(int(time.time())))\n", "\n", "create_model_response = sm_client.create_model(\n", " ModelName = model_name,\n", " ExecutionRoleArn = role,\n", " PrimaryContainer = {\n", " 'Image': container,\n", " 'ModelDataUrl': model_artifact,\n", " 'Environment': {\n", " 'TS_MAX_REQUEST_SIZE': '100000000', #default max request size is 6 Mb for torchserve, need to update it to support the 70 mb input payload\n", " 'TS_MAX_RESPONSE_SIZE': '100000000',\n", " 'TS_DEFAULT_RESPONSE_TIMEOUT': '1000'\n", " }\n", " }, \n", ")\n", "\n", "print(f\"Created Model: {create_model_response['ModelArn']}\")\n", "print(container)\n", "print(model_name) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create EndpointConfig\n", "\n", "모델을 생성했으면, EndpointConfig를 생성합니다. Amazon SageMaker 호스팅 서비스는 EndpointConfig를을 사용하여 모델을 배포하며 비동기 추론 시에는 AsyncInferenceConfig 인자값에 OutputConfig에 대한 출력 Amazon S3 경로를 지정합니다. 또한, 선택적으로 추론 결괏값에 대한 알림을 보낼 Amazon SNS 주제를 지정할 수 있습니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "bucket_prefix = \"async-inference-example\"\n", "endpoint_config_name = f\"PyTorchAsyncEndpointConfig-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", "create_endpoint_config_response = sm_client.create_endpoint_config(\n", " EndpointConfigName=endpoint_config_name,\n", " ProductionVariants=[\n", " {\n", " \"VariantName\": \"variant1\",\n", " \"ModelName\": model_name,\n", " \"InstanceType\": \"ml.g4dn.xlarge\",\n", " \"InitialInstanceCount\": 1\n", " }\n", " ],\n", " AsyncInferenceConfig={\n", " \"OutputConfig\": {\n", " \"S3OutputPath\": f\"s3://{bucket}/{bucket_prefix}/output\",\n", " # Optionally specify Amazon SNS topics\n", "# \"NotificationConfig\": {\n", "# \"SuccessTopic\": success_topic,\n", "# \"ErrorTopic\": error_topic,\n", "# }\n", " },\n", " \"ClientConfig\": {\n", " \"MaxConcurrentInvocationsPerInstance\": 2\n", " }\n", " }\n", ")\n", "print(f\"Created EndpointConfig: {create_endpoint_config_response['EndpointConfigArn']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Endpoint\n", "\n", "SageMaker 엔드포인트를 생성합니다. 호스팅 인스턴스를 프로비저닝하는 데 약 5분에서 10분의 시간이 소요됩니다.\n", "\n", "**[Note]** 향후 오토스케일링이나 AB 테스트 설정 변경 시에 엔드포인트를 재배포할 필요가 없이 신규 EndpointConfig 생성 후 update_endpoint()로 엔드포인트를 업데이트하면 됩니다. 아래 코드 예시를 참조하세요.\n", "\n", "```\n", "sm_client.create_endpoint_config(...)\n", "response = sm_client.update_endpoint(\n", " EndpointName=endpoint_name,\n", " EndpointConfigName=endpoint_config_name\n", ")\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "endpoint_name = f\"sm-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", "create_endpoint_response = sm_client.create_endpoint(EndpointName=endpoint_name, EndpointConfigName=endpoint_config_name)\n", "print(f\"Creating Endpoint: {create_endpoint_response['EndpointArn']}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from IPython.core.display import display, HTML\n", "\n", "def make_endpoint_link(region, endpoint_name, endpoint_task):\n", " endpoint_link = f'{endpoint_task} Review Endpoint' \n", " return endpoint_link \n", " \n", "endpoint_link = make_endpoint_link(region, endpoint_name, '[Asynchronous Endpoint]')\n", "display(HTML(endpoint_link))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "waiter = boto3.client('sagemaker').get_waiter('endpoint_in_service')\n", "print(\"Waiting for endpoint to create...\")\n", "waiter.wait(EndpointName=endpoint_name)\n", "resp = sm_client.describe_endpoint(EndpointName=endpoint_name)\n", "print(f\"Endpoint Status: {resp['EndpointStatus']}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "## 4. Asynchronous Inference\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Upload input video file" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def upload_file(bucket, bucket_prefix, input_location):\n", " prefix = f\"{bucket_prefix}/input\"\n", " return sm_session.upload_data(\n", " input_location, \n", " bucket=bucket,\n", " key_prefix=prefix, \n", " extra_args={\"ContentType\": \"video/mp4\"})" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "input_1_location = \"videos/Coke.mp4\"\n", "input_1_s3_location = upload_file(bucket, bucket_prefix, input_1_location)\n", "print(input_1_s3_location)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Invoke asynchronous endpoint" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm_client = boto_session.client(\"sagemaker\")\n", "sm_runtime = boto_session.client(\"sagemaker-runtime\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "response = sm_runtime.invoke_endpoint_async(\n", " EndpointName=endpoint_name, \n", " InputLocation=input_1_s3_location\n", ")\n", "output_location = response['OutputLocation']\n", "print(f\"OutputLocation: {output_location}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "from botocore.exceptions import ClientError\n", "\n", "def get_output(output_location):\n", " output_url = urllib.parse.urlparse(output_location)\n", " bucket = output_url.netloc\n", " key = output_url.path[1:]\n", " while True:\n", " try:\n", " return sm_session.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])\n", " except ClientError as e:\n", " if e.response['Error']['Code'] == 'NoSuchKey':\n", " print(\"waiting for output...\")\n", " time.sleep(10)\n", " continue\n", " raise\n", "\n", "def convert_size(size_bytes):\n", " import math\n", " if size_bytes == 0:\n", " return \"0B\"\n", " size_name = (\"B\", \"KB\", \"MB\", \"GB\", \"TB\", \"PB\", \"EB\", \"ZB\", \"YB\")\n", " i = int(math.floor(math.log(size_bytes, 1024)))\n", " p = math.pow(1024, i)\n", " s = round(size_bytes / p, 2)\n", " return \"%s %s\" % (s, size_name[i])\n", "\n", "output = get_output(output_location)\n", "output_size_bytes = sys.getsizeof(output)\n", "print(f\"Output size={output_size_bytes} bytes, {convert_size(output_size_bytes)}\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "output_json = json.loads(output)\n", "num_results = len(output_json)\n", "label_map = get_label_map('src/coco_labels.txt')\n", "\n", "display_instances(\n", " image=video_frames[0], \n", " boxes=output_json[0]['boxes'], \n", " masks=output_json[0]['masks'],\n", " class_ids=output_json[0]['labels'], \n", " class_names=label_map,\n", " scores=output_json[0]['scores']\n", ")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Trigger 10 asynchronous requests on a single instance " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "inferences = []\n", "\n", "for i in range(1,10):\n", " start = time.time()\n", " response = sm_runtime.invoke_endpoint_async(\n", " EndpointName=endpoint_name, \n", " InputLocation=input_1_s3_location)\n", " output_location = response[\"OutputLocation\"]\n", " inferences += [(input_1_s3_location, output_location)]\n", " time.sleep(0.5)\n", "print(\"\\Async invocations for Pytorch serving default: \\n\")\n", "\n", "for input_file, output_location in inferences:\n", " output = get_output(output_location)\n", " print(f\"Input File: {input_file}, Output location: {output_location}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "## 5. (Optional) AutoScaling\n", "---\n", "[주의] 본 섹션은 필수가 아니기에 선택적으로 진행하세요. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### (Optional) Setup AutoScaling policy\n", "\n", "Application Autoscaling을 활성화하여 비동기 엔드포인트에서 Autoscaling을 구성합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "enable_autoscaling = False \n", "\n", "if enable_autoscaling:\n", " client = boto3.client('application-autoscaling') # Common class representing Application Auto Scaling for SageMaker amongst other services\n", "\n", " resource_id = 'endpoint/' + endpoint_name + '/variant/' + 'variant1' # This is the format in which application autoscaling references the endpoint\n", "\n", " # Configure Autoscaling on asynchronous endpoint down to zero instances\n", " response = client.register_scalable_target(\n", " ServiceNamespace='sagemaker', \n", " ResourceId=resource_id,\n", " ScalableDimension='sagemaker:variant:DesiredInstanceCount',\n", " MinCapacity=0, \n", " MaxCapacity=5\n", " )\n", "\n", " response = client.put_scaling_policy(\n", " PolicyName='Invocations-ScalingPolicy',\n", " ServiceNamespace='sagemaker', # The namespace of the AWS service that provides the resource. \n", " ResourceId=resource_id, # Endpoint name \n", " ScalableDimension='sagemaker:variant:DesiredInstanceCount', # SageMaker supports only Instance Count\n", " PolicyType='TargetTrackingScaling', # 'StepScaling'|'TargetTrackingScaling'\n", " TargetTrackingScalingPolicyConfiguration={\n", " 'TargetValue': 5.0, # The target value for the metric. \n", " 'CustomizedMetricSpecification': {\n", " 'MetricName': 'ApproximateBacklogSizePerInstance',\n", " 'Namespace': 'AWS/SageMaker',\n", " 'Dimensions': [\n", " {'Name': 'EndpointName', 'Value': endpoint_name }\n", " ],\n", " 'Statistic': 'Average',\n", " },\n", " 'ScaleInCooldown': 120, # The cooldown period helps you prevent your Auto Scaling group from launching or terminating \n", " # additional instances before the effects of previous activities are visible. \n", " # You can configure the length of time based on your instance startup time or other application needs.\n", " # ScaleInCooldown - The amount of time, in seconds, after a scale in activity completes before another scale in activity can start. \n", " 'ScaleOutCooldown': 120 # ScaleOutCooldown - The amount of time, in seconds, after a scale out activity completes before another scale out activity can start.\n", "\n", " # 'DisableScaleIn': True|False - Indicates whether scale in by the target tracking policy is disabled. \n", " # If the value is true , scale in is disabled and the target tracking policy won't remove capacity from the scalable resource.\n", " }\n", " )\n", " print(response) " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### (Option) Trigger 1000 asynchronous invocations with autoscaling from 1 to 5 and then scale down to 0 on completion" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "만약 SNS topic을 활성화했다면 반드시 [delete the SNS topic](https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/sns.html#SNS.Client.delete_topic) 를 참조하여 SNS를 삭제해 주세요." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if enable_autoscaling:\n", " print(endpoint_name)\n", " for i in range(1,1000):\n", " response = sm_runtime.invoke_endpoint_async(\n", " EndpointName=endpoint_name, \n", " InputLocation=input_1_s3_location)\n", " print(\"\\Async invocations for Pytorch serving with auotscaling \\n\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Plot graphs from CloudWatch Metrics\n", "\n", "트래픽 버스트가 호출되면 백로그가 0에서 1000으로 증가합니다. 그런 다음 비동기 추론 엔드포인트는 최대 인스턴스 수(예제 코드의 경우 5개)까지 120초마다 오토스케일링됩니다. 인스턴스당 백로그 크기(Backlog size per instance)는 오토스케일링 중에 빠르게 변경됩니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import datetime\n", "import pandas as pd\n", "from datetime import datetime,timedelta\n", "cw = boto3.Session().client(\"cloudwatch\")\n", "\n", "def get_sagemaker_metrics(endpoint_name,\n", " endpoint_config_name,\n", " variant_name,\n", " metric_name,\n", " statistic,\n", " start_time,\n", " end_time):\n", " dimensions = [\n", " {\n", " \"Name\": \"EndpointName\",\n", " \"Value\": endpoint_name\n", " },\n", " {\n", " \"Name\": \"VariantName\",\n", " \"Value\": variant_name\n", " }\n", " ]\n", " if endpoint_config_name is not None:\n", " dimensions.append({\n", " \"Name\": \"EndpointConfigName\",\n", " \"Value\": endpoint_config_name\n", " })\n", " metrics = cw.get_metric_statistics(\n", " Namespace=\"AWS/SageMaker\",\n", " MetricName=metric_name,\n", " StartTime=start_time,\n", " EndTime=end_time,\n", " Period=60,\n", " Statistics=[statistic],\n", " Dimensions=dimensions\n", " )\n", " rename = endpoint_config_name if endpoint_config_name is not None else 'ALL'\n", " return pd.DataFrame(metrics[\"Datapoints\"])\\\n", " .sort_values(\"Timestamp\")\\\n", " .set_index(\"Timestamp\")\\\n", " .drop([\"Unit\"], axis=1)\\\n", " .rename(columns={statistic: rename})\n", "\n", "def plot_endpoint_model_latency_metrics(endpoint_name, endpoint_config_name, variant_name, start_time=None):\n", " start_time = start_time or datetime.now() - timedelta(minutes=60)\n", " end_time = datetime.now()\n", " metric_name = \"ModelLatency\"\n", " statistic = \"Average\"\n", " metrics_variants = get_sagemaker_metrics(\n", " endpoint_name,\n", " endpoint_config_name,\n", " variant_name,\n", " metric_name, \n", " statistic,\n", " start_time,\n", " end_time)\n", " metrics_variants.plot(title=f\"{metric_name}-{statistic}\")\n", " return metrics_variants\n", "\n", "if enable_autoscaling:\n", " model_latency_metrics = plot_endpoint_model_latency_metrics(endpoint_name, None, \"variant1\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "
\n", "\n", "## Clean up\n", "---" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "엔드포인트에 대해 오토스케일링을 활성화한 경우, 엔드포인트를 삭제하기 전 `deregister_scalable_target()`로 확장 가능한 대상을 해지해야 합니다." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "if enable_autoscaling:\n", " response = client.deregister_scalable_target(\n", " ServiceNamespace='sagemaker',\n", " ResourceId=resource_id,\n", " ScalableDimension='sagemaker:variant:DesiredInstanceCount'\n", " )\n", " print(response)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "불필요한 과금을 방지하기 위해 엔드포인트를 삭제합니다. SNS topic, S3 object 등의 리소스도 필요하지 않다면 삭제해 주세요." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sm_client.delete_endpoint(EndpointName=endpoint_name)\n", "sm_client.delete_model(ModelName=model_name)" ] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "conda_pytorch_latest_p37", "language": "python", "name": "conda_pytorch_latest_p37" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.12" } }, "nbformat": 4, "nbformat_minor": 4 }