{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Text2Image, Visual Grounding, Image Caption, and Visual Question and Answer using OFA\n", "## Based on the [OFA model](https://huggingface.co/OFA-Sys/OFA-large)\n", "### Code by Suresh Poopandi, October 2022" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "From [OFA GitHub](https://github.com/OFA-Sys/OFA):\n", "OFA is a unified sequence-to-sequence pre-trained model (support English and Chinese) that unifies modalities (i.e., cross-modality, vision, language) and tasks (finetuning and prompt tuning are supported): image captioning (1st at the MSCOCO Leaderboard), VQA , visual grounding, text-to-image generation, text classification, text generation, image classification, etc. We provide step-by-step instructions for pretraining and fine tuning and corresponding checkpoints (check official ckpt [EN|CN] or huggingface ckpt).\n", "\n", "This notebook will walk through the process of deploying the OFA model to an AWS SageMaker endpoint. We'll start by downloading the model locally to test it out, but note this will only work if you have access to a GPU. Next, we create a custom docker container which contains the model. In this use case, we will have the endpoint put the generated images directly in an S3 bucket, and return the location of the image.\n", "\n", "Note that this model is large. You'll want to make sure to use an instance type with at least 15GB of storage in order to have space to download the model and package up the container. This notebook was tested on a ml.g4dn.2xlarge with 25GB of EBS storage, you can also run it on CPU based instances but the performance might be impacted.\n", "\n", "The notebook follows these basic steps:\n", "1. Install Dependencies (for local testing)\n", "2. Test the model locally\n", "3. Create a custom inference script\n", "4. Create a unit test file to test inference script\n", "5. Create a custom Docker container for the model and inference script\n", "6. Test the Docker container locally\n", "7. Define and Deploy the model\n", "8. Test the new endpoint\n", "9. Clean up resources\n", "\n", "References:\n", " * [OFA Model on Hugging Face](https://huggingface.co/OFA-Sys/OFA-large)\n", "\n", "Container Structure: This should be the directory structure locally, in order to pack everything correctly into your container. ([reference](https://sagemaker-workshop.com/custom/containers.html)) You will already have this structure if you cloned the git repo, if not, following the directions in this notebook will rebuild this structure and all required files.\n", "- This Notebook\n", "- container\n", " - OFA\n", " - predictor.py: Flask app for inference, our custom inference code\n", " - wsgi.py: Wrapper around predictor\n", " - nginx.conf: Config for nginx front-end\n", " - serve: Launches gunicorn server\n", " - OFA model downloaded from git\n", " - test_predictor.py: test methods to test predictor\n", " - Dockerfile" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 1. Install Dependencies\n", "\n", "### 1(a) Setup Local Environment\n", "This step creates the required folders and download the ofa-large pre-trained model\n", "Make sure you start in the same directory that this notebook is in." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%%sh\n", "\n", "mkdir container\n", "\n", "cd container\n", "\n", "git clone https://github.com/OFA-Sys/OFA\n", "\n", "cd OFA\n", "\n", "pip install -r requirements.txt\n", "\n", "pip install --upgrade gradio\n", "\n", "cd fairseq\n", "\n", "pip install ./\n", "\n", "cd ../..\n", "\n", "wget https://ofa-silicon.oss-us-west-1.aliyuncs.com/checkpoints/ofa_large_clean.pt\n", "\n", "mkdir -p checkpoints\n", "\n", "mv ofa_large_clean.pt checkpoints/ofa_large_clean.pt\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1(b) Set up SageMaker enviornment\n", "This gives us access to basic information and functionality for our SageMaker environment, including the IAM role we are going to use in the next setup step." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "import sagemaker\n", "import boto3\n", "sess = sagemaker.Session()\n", "# sagemaker session bucket -> used for uploading data, models and logs\n", "# sagemaker will automatically create this bucket if it does not exist\n", "sagemaker_session_bucket=None\n", "if sagemaker_session_bucket is None and sess is not None:\n", " # set to default bucket if a bucket name is not given\n", " sagemaker_session_bucket = sess.default_bucket()\n", "\n", "try:\n", " role = sagemaker.get_execution_role()\n", "except ValueError:\n", " iam = boto3.client('iam')\n", " role = iam.get_role(RoleName='sagemaker_execution_role')['Role']['Arn']\n", "\n", "sess = sagemaker.Session(default_bucket=sagemaker_session_bucket)\n", "\n", "#used later on when deploying a model:\n", "sm_client = boto3.client(service_name='sagemaker')\n", "runtime_sm_client = boto3.client(service_name='sagemaker-runtime')\n", "account_id = boto3.client('sts').get_caller_identity()['Account']\n", "region = boto3.Session().region_name\n", "\n", "print(f\"sagemaker role arn: {role}\")\n", "print(f\"sagemaker bucket: {sess.default_bucket()}\")\n", "print(f\"sagemaker session region: {sess.boto_region_name}\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1(c) Set up IAM permissions\n", "In additon to the default IAM permissions, you need to add S3 access (because the model stores images in S3, and ECR access because we push our custom docker container to ECR for use by SageMaker." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Add this policy for ECR access, note that you are giving permission to create a new repository, as well as push images to it.\n", "`\n", "{\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Action\": [\n", " \"ecr:CompleteLayerUpload\",\n", " \"ecr:GetAuthorizationToken\",\n", " \"ecr:UploadLayerPart\",\n", " \"ecr:InitiateLayerUpload\",\n", " \"ecr:BatchCheckLayerAvailability\",\n", " \"ecr:PutImage\",\n", " \"ecr:CreateRepository\"\n", " ],\n", " \"Resource\": \"*\"\n", " }\n", " ]\n", "}\n", "`\n", "\n", "For S3 access, attach AWS Managed IAM Policy **AmazonS3FullAccess** to the role used by AWS SageMaker\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 2. Test the model locally\n", "This allows a user to get a feel for what the model can do, and how to use the hyperparameters. Feel free to skip this step if you are already comfortable with the model.\n", "\n", "Following fix is needed to workaround the version incompatibility between Pytorch and horovod" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!pip uninstall -y horovod\n", "!HOROVOD_WITH_PYTORCH=1 \n", "!pip install --no-cache-dir horovod" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "import sys\n", "import os\n", "print(os.getcwd())\n", "sys.path.append(os.path.normpath(os.path.join(os.getcwd(), 'container/OFA')))\n", "\n", "from lib2to3.pgen2 import token\n", "from flask import Flask\n", "import flask\n", "import os\n", "import json\n", "import logging\n", "\n", "import boto3\n", "import io\n", "\n", "import torch\n", "import numpy as np\n", "from fairseq import checkpoint_utils\n", "from fairseq import options, tasks, utils\n", "from fairseq.dataclass.utils import convert_namespace_to_omegaconf\n", "from tasks.mm_tasks.refcoco import RefcocoTask\n", "from PIL import Image\n", "from torchvision import transforms\n", "import cv2\n", "import gradio as gr\n", "\n", "\n", "\n", "OFA_TASK_IMAGE_CAPTION=\"OFA_TASK_IMAGE_CAPTION\"\n", "OFA_TASK_VISUAL_QA=\"OFA_TASK_VISUAL_QA\"\n", "OFA_TASK_VISUAL_GROUNDING=\"OFA_TASK_VISUAL_GROUNDING\"\n", "OFA_TASK_TEXT2IMAGE=\"OFA_TASK_TEXT2IMAGE\"\n", "\n", "# Register\n", "tasks.register_task('refcoco', RefcocoTask)\n", "\n", "# turn on cuda if GPU is available\n", "use_cuda = torch.cuda.is_available()\n", "# use fp16 only when GPU is available\n", "use_fp16 = False\n", "\n", "# specify some options for evaluation\n", "parser = options.get_generation_parser()\n", "input_args = [\"\", \"--task=refcoco\", \"--beam=10\", \"--path=container/checkpoints/ofa_large_clean.pt\", \"--bpe-dir=container/OFA/utils/BPE\"]\n", "args = options.parse_args_and_arch(parser, input_args)\n", "cfg = convert_namespace_to_omegaconf(args)\n", "\n", "# Load pretrained ckpt & config\n", "task = tasks.setup_task(cfg.task)\n", "models, cfg = checkpoint_utils.load_model_ensemble(\n", " utils.split_paths(cfg.common_eval.path),\n", " task=task\n", ")\n", "\n", "# Move models to GPU\n", "for model in models:\n", " model.eval()\n", " if use_fp16:\n", " model.half()\n", " if use_cuda and not cfg.distributed_training.pipeline_model_parallel:\n", " model.cuda()\n", " model.prepare_for_inference_(cfg)\n", "\n", "# Initialize generator\n", "generator = task.build_generator(models, cfg.generation)\n", "\n", "mean = [0.5, 0.5, 0.5]\n", "std = [0.5, 0.5, 0.5]\n", "\n", "patch_resize_transform = transforms.Compose([\n", " lambda image: image.convert(\"RGB\"),\n", " transforms.Resize((task.cfg.patch_image_size, task.cfg.patch_image_size), interpolation=Image.BICUBIC),\n", " transforms.ToTensor(),\n", " transforms.Normalize(mean=mean, std=std),\n", "])\n", "\n", "# Text preprocess\n", "bos_item = torch.LongTensor([task.src_dict.bos()])\n", "eos_item = torch.LongTensor([task.src_dict.eos()])\n", "pad_idx = task.src_dict.pad()\n", "\n", "\n", "def get_symbols_to_strip_from_output(generator):\n", " if hasattr(generator, \"symbols_to_strip_from_output\"):\n", " return generator.symbols_to_strip_from_output\n", " else:\n", " return {generator.bos, generator.eos}\n", "\n", "\n", "def decode_fn(x, tgt_dict, bpe, generator, tokenizer=None):\n", " x = tgt_dict.string(x.int().cpu(), extra_symbols_to_ignore=get_symbols_to_strip_from_output(generator))\n", " token_result = []\n", " bin_result = []\n", " img_result = []\n", " for token in x.strip().split():\n", " if token.startswith('\".format(int((coord_list[0] * w_resize_ratio / task.cfg.max_image_size * (task.cfg.num_bins - 1))))]\n", " bin_list += [\n", " \"\".format(int((coord_list[1] * h_resize_ratio / task.cfg.max_image_size * (task.cfg.num_bins - 1))))]\n", " bin_list += [\n", " \"\".format(int((coord_list[2] * w_resize_ratio / task.cfg.max_image_size * (task.cfg.num_bins - 1))))]\n", " bin_list += [\n", " \"\".format(int((coord_list[3] * h_resize_ratio / task.cfg.max_image_size * (task.cfg.num_bins - 1))))]\n", " return ' '.join(bin_list)\n", "\n", "\n", "def bin2coord(bins, w_resize_ratio, h_resize_ratio):\n", " bin_list = [int(bin[5:-1]) for bin in bins.strip().split()]\n", " coord_list = []\n", " coord_list += [bin_list[0] / (task.cfg.num_bins - 1) * task.cfg.max_image_size / w_resize_ratio]\n", " coord_list += [bin_list[1] / (task.cfg.num_bins - 1) * task.cfg.max_image_size / h_resize_ratio]\n", " coord_list += [bin_list[2] / (task.cfg.num_bins - 1) * task.cfg.max_image_size / w_resize_ratio]\n", " coord_list += [bin_list[3] / (task.cfg.num_bins - 1) * task.cfg.max_image_size / h_resize_ratio]\n", " return coord_list\n", "\n", "\n", "def encode_text(text, length=None, append_bos=False, append_eos=False):\n", " line = [\n", " task.bpe.encode(' {}'.format(word.strip()))\n", " if not word.startswith('\".format(int((coord_list[0] * w_resize_ratio / task.cfg.max_image_size * (task.cfg.num_bins - 1))))]\n", " bin_list += [\n", " \"\".format(int((coord_list[1] * h_resize_ratio / task.cfg.max_image_size * (task.cfg.num_bins - 1))))]\n", " bin_list += [\n", " \"\".format(int((coord_list[2] * w_resize_ratio / task.cfg.max_image_size * (task.cfg.num_bins - 1))))]\n", " bin_list += [\n", " \"\".format(int((coord_list[3] * h_resize_ratio / task.cfg.max_image_size * (task.cfg.num_bins - 1))))]\n", " return ' '.join(bin_list)\n", "\n", "\n", "def bin2coord(bins, w_resize_ratio, h_resize_ratio):\n", " bin_list = [int(bin[5:-1]) for bin in bins.strip().split()]\n", " coord_list = []\n", " coord_list += [bin_list[0] / (task.cfg.num_bins - 1) * task.cfg.max_image_size / w_resize_ratio]\n", " coord_list += [bin_list[1] / (task.cfg.num_bins - 1) * task.cfg.max_image_size / h_resize_ratio]\n", " coord_list += [bin_list[2] / (task.cfg.num_bins - 1) * task.cfg.max_image_size / w_resize_ratio]\n", " coord_list += [bin_list[3] / (task.cfg.num_bins - 1) * task.cfg.max_image_size / h_resize_ratio]\n", " return coord_list\n", "\n", "\n", "def encode_text(text, length=None, append_bos=False, append_eos=False):\n", " line = [\n", " task.bpe.encode(' {}'.format(word.strip()))\n", " if not word.startswith('\"\n", " print(f\"Instruction:{instruction}\")\n", " print (f\"New request:{bucket_name}:{key_name}\")\n", "\n", " if ofa_task in [OFA_TASK_VISUAL_QA,OFA_TASK_VISUAL_GROUNDING,OFA_TASK_IMAGE_CAPTION]:\n", " #download the image from S3 URL\n", " s3 = boto3.client('s3')\n", " s3_response_object = s3.get_object(Bucket=bucket_name, Key=key_name)\n", " image_data = s3_response_object['Body'].read()\n", " image = Image.open(io.BytesIO(image_data))\n", "\n", "\n", " output_img, tokens=general_interface(image,instruction)\n", "\n", " if output_img:\n", " print(\"uploading file\")\n", " s3 = boto3.client('s3')\n", " s3.upload_fileobj(output_img, bucket_name, instruction+key_name)\n", "\n", " result = json.dumps( {'output':tokens})\n", "\n", " return flask.Response(response=result, status=200, mimetype='application/json')\n", "\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's make a small modification to the serve file. Normally, it would spawn a worker thread for each CPU core, but this doesn't work in our case because the process is GPU bound. The original serve file lines are all here, but we add the model_server_workers = 1 to limit the endpoint to only loading a single model into the GPU. Additional logic may be required if using an instance type with multiple GPU's." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%%writefile container/OFA/serve\n", "#!/usr/bin/env python\n", "\n", "# This file implements the scoring service shell. You don't necessarily need to modify it for various\n", "# algorithms. It starts nginx and gunicorn with the correct configurations and then simply waits until\n", "# gunicorn exits.\n", "#\n", "# The flask server is specified to be the app object in wsgi.py\n", "#\n", "# We set the following parameters:\n", "#\n", "# Parameter Environment Variable Default Value\n", "# --------- -------------------- -------------\n", "# number of workers MODEL_SERVER_WORKERS the number of CPU cores\n", "# timeout MODEL_SERVER_TIMEOUT 60 seconds\n", "\n", "import multiprocessing\n", "import os\n", "import signal\n", "import subprocess\n", "import sys\n", "\n", "cpu_count = multiprocessing.cpu_count()\n", "\n", "model_server_timeout = os.environ.get('MODEL_SERVER_TIMEOUT', 60)\n", "model_server_workers = int(os.environ.get('MODEL_SERVER_WORKERS', cpu_count))\n", "\n", "#for our GPU based inference, set to one. This process is GPU bound, and the GPU may run out of space if more than one model is loaded.\n", "model_server_workers = 1\n", "\n", "def sigterm_handler(nginx_pid, gunicorn_pid):\n", " try:\n", " os.kill(nginx_pid, signal.SIGQUIT)\n", " except OSError:\n", " pass\n", " try:\n", " os.kill(gunicorn_pid, signal.SIGTERM)\n", " except OSError:\n", " pass\n", "\n", " sys.exit(0)\n", "\n", "def start_server():\n", " print('Starting the inference server with {} workers.'.format(model_server_workers))\n", "\n", "\n", " # link the log streams to stdout/err so they will be logged to the container logs\n", " subprocess.check_call(['ln', '-sf', '/dev/stdout', '/var/log/nginx/access.log'])\n", " subprocess.check_call(['ln', '-sf', '/dev/stderr', '/var/log/nginx/error.log'])\n", "\n", " nginx = subprocess.Popen(['nginx', '-c', '/OFA/nginx.conf'])\n", " gunicorn = subprocess.Popen(['gunicorn',\n", " '--timeout', str(model_server_timeout),\n", " '-k', 'sync',\n", " '-b', 'unix:/tmp/gunicorn.sock',\n", " '-w', str(model_server_workers),\n", " 'wsgi:app'])\n", "\n", " signal.signal(signal.SIGTERM, lambda a, b: sigterm_handler(nginx.pid, gunicorn.pid))\n", "\n", " # If either subprocess exits, so do we.\n", " pids = set([nginx.pid, gunicorn.pid])\n", " while True:\n", " pid, _ = os.wait()\n", " if pid in pids:\n", " break\n", "\n", " sigterm_handler(nginx.pid, gunicorn.pid)\n", " print('Inference server exiting')\n", "\n", "# The main routine just invokes the start function.\n", "\n", "if __name__ == '__main__':\n", " start_server()\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we write an nginx configuration file. This is standard for most custom containers, and we don't make any changes for this model." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%%writefile container/OFA/nginx.conf\n", "\n", "worker_processes 1;\n", "daemon off; # Prevent forking\n", "\n", "\n", "pid /tmp/nginx.pid;\n", "error_log /var/log/nginx/error.log;\n", "\n", "events {\n", " # defaults\n", "}\n", "\n", "http {\n", " include /etc/nginx/mime.types;\n", " default_type application/octet-stream;\n", " access_log /var/log/nginx/access.log combined;\n", "\n", " upstream gunicorn {\n", " server unix:/tmp/gunicorn.sock;\n", " }\n", "\n", " server {\n", " listen 8080 deferred;\n", " client_max_body_size 5m;\n", "\n", " keepalive_timeout 5;\n", " proxy_read_timeout 1200s;\n", "\n", " location ~ ^/(ping|invocations) {\n", " proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;\n", " proxy_set_header Host $http_host;\n", " proxy_redirect off;\n", " proxy_pass http://gunicorn;\n", " }\n", "\n", " location / {\n", " return 404 \"{}\";\n", " }\n", " }\n", "}\n", "\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Finally, we write a quick wrapper for wsgi, so that the web server can find and load our inference script." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%%writefile container/OFA/wsgi.py\n", "import predictor as myapp\n", "\n", "# This is just a simple wrapper for gunicorn to find your app.\n", "# If you want to change the algorithm file, simply change \"predictor\" above to the\n", "# new file.\n", "\n", "app = myapp.app\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now that we have all the files we need for our container, we create the Dockerfile. This is the instructions for Docker on how to build the container. Most of this file is the same for any custom container, but note the `RUN pip install` line, which installs the specific PyTorch and HuggingFace libraries we need for our model. These are the same ones we used for local testing, as well as boto3 to handle the upload of generated images to S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%%writefile container/Dockerfile\n", "FROM python:3.8\n", "\n", "RUN apt-get -y update && apt-get install -y --no-install-recommends \\\n", " wget \\\n", " python3 \\\n", " nginx \\\n", " ca-certificates \\\n", " git \\\n", " ffmpeg \\\n", " libsm6 \\\n", " libxext6 \\\n", " && rm -rf /var/lib/apt/lists/*\n", "\n", "RUN wget https://bootstrap.pypa.io/get-pip.py && python3 get-pip.py && \\\n", " pip install flask gevent gunicorn && \\\n", " rm -rf /root/.cache\n", "\n", "RUN git clone https://github.com/OFA-Sys/OFA\n", "RUN cd OFA && pip install -r requirements.txt\n", "\n", "RUN cd OFA && wget https://ofa-silicon.oss-us-west-1.aliyuncs.com/checkpoints/ofa_large_clean.pt &&\\\n", " mkdir -p checkpoints &&\\\n", " mv ofa_large_clean.pt checkpoints/ofa_large_clean.pt\n", "\n", "RUN pip install --upgrade torchvision boto3 gradio\n", "RUN cd OFA/fairseq &&\\\n", " pip install ./\n", "\n", "\n", "#testing\n", "RUN pip install pytest behave\n", "\n", "\n", "COPY OFA /OFA\n", "WORKDIR /OFA\n", "ENV PATH=\"/OFA:${PATH}\"\n", "RUN chmod +x /OFA/serve\n", "\n", "RUN [\"chmod\", \"+x\", \"/OFA/test.sh\"]\n", "\n", "CMD [\"./test.sh\"]\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now create a python unit tests to test OFA model" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%%writefile container/OFA/test_predictor.py\n", "import os\n", "import uuid\n", "import boto3\n", "\n", "\n", "import predictor as myapp\n", "import pytest\n", "import json\n", "\n", "bucket_name = f\"pytest-ofa-{str(uuid.uuid4())}\"\n", "print(f\"bucket name:{bucket_name}\")\n", "file_name=\"cat.png\"\n", "\n", "def setup_module(module):\n", " \"\"\"...\"\"\"\n", " s3_client = boto3.client('s3')\n", " s3_client.create_bucket(Bucket=bucket_name)\n", " s3_client.upload_file(file_name, bucket_name, file_name)\n", " print(\"copied files\")\n", "\n", "def teardown_module(module):\n", " \"\"\"...\"\"\"\n", " #delete all objeccs\n", " s3_client = boto3.client('s3')\n", " response = s3_client.list_objects(Bucket=bucket_name)\n", " if 'Contents' in response:\n", " for content in response['Contents']:\n", " s3_client.delete_object(Bucket=bucket_name, Key=content['Key'])\n", "\n", " #delete bucket\n", " s3_client.delete_bucket(Bucket=bucket_name)\n", " print(\"Deleted bucket and files\")\n", "\n", "@pytest.fixture\n", "def client():\n", " with myapp.app.test_client() as client:\n", " yield client\n", "\n", "def test_ping(client):\n", " \"\"\"Test ping operation\"\"\"\n", " print(\"test ping\")\n", " rv = client.get('/ping')\n", " print(rv.data)\n", " print(\"done ping\")\n", "\n", "def test_image_caption(client):\n", " \"\"\"Test image caption\"\"\"\n", " print(\"inside the test function\")\n", " rv = client.post('/invocations', json={\n", " \"ofa_task\":\"OFA_TASK_IMAGE_CAPTION\",\n", " \"bucket_name\" : bucket_name,\n", " \"key_name\" : file_name\n", " })\n", " print(rv.data)\n", " data = json.loads(rv.data)\n", " assert \"a cat wearing a face mask\" in data['output']\n", "\n", "\n", "def test_visual_qa(client):\n", " \"\"\"Test visual qa\"\"\"\n", " print(\"inside the test function\")\n", " rv = client.post('/invocations', json={\n", " \"ofa_task\":\"OFA_TASK_VISUAL_QA\",\n", " \"bucket_name\" : bucket_name,\n", " \"key_name\" : file_name,\n", " \"instruction\": \"what is cat wearing?\"\n", "\n", " })\n", " print(rv.data)\n", " data = json.loads(rv.data)\n", " assert \"mask\" in data['output']\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Create a test shell script that will be invoked when the container is run locally" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%%writefile container/OFA/test.sh\n", "#!/bin/sh\n", "\n", "pytest -rPv --capture=no test_predictor.py" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Copy the sample image used by the unit tests" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "!cp cat.png container/OFA" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 4. Create a custom Docker container for this inference script" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "First, clean out any old Docker images to prevent your Jupyter instance from running out of space." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "!docker system prune -af" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let us run the docker image locally and test the model using unit tests" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%%capture capt\n", "%%time\n", "algorithm_name=\"ofa\"\n", "!docker build -t $algorithm_name container/.\n", "!docker run -it $algorithm_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "capt.stderr" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we do a bit of setup work to get the name variables into the shell, and then run Docker build to actually build the container. Once it's built, we push it to ECR, and create a new repository in ECR for the container if one does not already exist." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Print the errror from executing above shell script" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%%capture capt\n", "%%sh\n", "set -x\n", "\n", "# The name of our algorithm\n", "algorithm_name=ofa\n", "\n", "cd container\n", "\n", "account=$(aws sts get-caller-identity --query Account --output text)\n", "\n", "# Get the region defined in the current configuration (default to us-west-2 if none defined)\n", "region=$(aws configure get region)\n", "region=${region:-us-west-2}\n", "\n", "fullname=\"${account}.dkr.ecr.${region}.amazonaws.com/${algorithm_name}:latest\"\n", "\n", "# If the repository doesn't exist in ECR, create it.\n", "\n", "aws ecr describe-repositories --repository-names \"${algorithm_name}\" > /dev/null 2>&1\n", "\n", "if [ $? -ne 0 ]\n", "then\n", " aws ecr create-repository --repository-name \"${algorithm_name}\" > /dev/null\n", "fi\n", "\n", "# Get the login command from ECR and execute it directly\n", "$(aws ecr get-login --region ${region} --no-include-email)\n", "\n", "# Get the login command from ECR in order to pull down the SageMaker PyTorch image\n", "#$(aws ecr get-login --registry-ids 520713654638 --region ${region} --no-include-email)\n", "\n", "# Build the docker image locally with the image name and then push it to ECR\n", "# with the full name.\n", "\n", "docker build -t ${algorithm_name} . --build-arg REGION=${region}\n", "docker tag ${algorithm_name} ${fullname}\n", "\n", "docker push ${fullname}\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Print the errror from executing above shell script" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "capt.stdout" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# 5. Define and Deploy the model" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5 a) Define the model object\n", "Now that the container is built, we can start to set up the SageMaker endpoint. The first step is to create a [SageMaker model object](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints-deployment.html#realtime-endpoints-deployment-create-model), which is a unique name for the model, the location of the image we just built, and the role the endpoint should use. Here we use the same role that is used by this notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "model='ofa'\n", "\n", "import boto3\n", "from sagemaker import get_execution_role\n", "\n", "sm_client = boto3.client(service_name='sagemaker')\n", "runtime_sm_client = boto3.client(service_name='sagemaker-runtime')\n", "\n", "account_id = boto3.client('sts').get_caller_identity()['Account']\n", "region = boto3.Session().region_name\n", "\n", "\n", "role = get_execution_role()\n", "\n", "\n", "from time import gmtime, strftime\n", "\n", "model_name = f\"{model}-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", "container = f\"{account_id}.dkr.ecr.{region}.amazonaws.com/{model}:latest\"\n", "instance_type = 'ml.c5d.18xlarge'\n", "\n", "print('Model name: ' + model_name)\n", "print('Container image: ' + container)\n", "\n", "container = {\n", " 'Image': container\n", "}\n", "\n", "create_model_response = sm_client.create_model(\n", " ModelName = model_name,\n", " ExecutionRoleArn = role,\n", " Containers = [container])\n", "\n", "print(\"Model Arn: \" + create_model_response['ModelArn'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5 b) Define the Endpoint Configuration\n", "Now that we've defined what the model object is, we set up the [Endpoint Configuration](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints-deployment.html#realtime-endpoints-deployment-create-endpoint-config). This is where we set the details on what kind of machines the model should be running on. Here, we set it to g4dn.2xlarge, which is an instance type with one GPU. We also set our autoscaling group to 1, meaning only a single instance max will be added to the group. This helps to keep cost down while testing." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "endpoint_config_name = f\"{model}-config{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", "print('Endpoint config name: ' + endpoint_config_name)\n", "\n", "instance_type = 'ml.g4dn.2xlarge'\n", "\n", "create_endpoint_config_response = sm_client.create_endpoint_config(\n", " EndpointConfigName = endpoint_config_name,\n", " ProductionVariants=[{\n", " 'InstanceType': instance_type,\n", " 'InitialInstanceCount': 1,\n", " 'InitialVariantWeight': 1,\n", " 'ModelName': model_name,\n", " 'VariantName': 'AllTraffic'}])\n", "\n", "print(\"Endpoint config Arn: \" + create_endpoint_config_response['EndpointConfigArn'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5 c) Deploy the endpoint\n", "Now that we have set up our configuration for the model and for the endpoint, we can bring these two together and actually [deploy the model](https://docs.aws.amazon.com/sagemaker/latest/dg/realtime-endpoints-deployment.html#w1131aac27c17b9b9b7c29). Note that this step make take up to 10 minutes as the endpoint is turned on." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%%time\n", "\n", "import time\n", "\n", "endpoint_name = f\"{model}-endpoint{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}\"\n", "print('Endpoint name: ' + endpoint_name)\n", "\n", "create_endpoint_response = sm_client.create_endpoint(\n", " EndpointName=endpoint_name,\n", " EndpointConfigName=endpoint_config_name)\n", "print('Endpoint Arn: ' + create_endpoint_response['EndpointArn'])\n", "\n", "resp = sm_client.describe_endpoint(EndpointName=endpoint_name)\n", "status = resp['EndpointStatus']\n", "print(\"Endpoint Status: \" + status)\n", "\n", "print('Waiting for {} endpoint to be in service...'.format(endpoint_name))\n", "waiter = sm_client.get_waiter('endpoint_in_service')\n", "waiter.wait(EndpointName=endpoint_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 6. Test the Endpoint\n", "Now that the endpoint is online, we can test it using the [Invoke Endpoint](https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/sagemaker-runtime.html#SageMakerRuntime.Client.invoke_endpoint) function inside SageMaker. The endpoint is returning the image name, so we also set up a connection to S3 to download and display the image generated by the endpoint. Alternativly, we could set up a [REST API endpoint using API Gateway](https://aws.amazon.com/blogs/machine-learning/creating-a-machine-learning-powered-rest-api-with-amazon-api-gateway-mapping-templates-and-amazon-sagemaker/)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 6.1. Create a temporary bucket for testing" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "s3_client = boto3.client('s3')\n", "import uuid\n", "\n", "#create a temporary bucket\n", "\n", "bucket_name = f\"{model}-test-{str(uuid.uuid4())}\"\n", "print(f\"bucket name:{bucket_name}\")\n", "if region=='us-east-1':\n", " s3_client.create_bucket(Bucket=bucket_name)\n", "else:\n", " s3_client.create_bucket(Bucket=bucket_name,CreateBucketConfiguration={'LocationConstraint': region})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 6.2. Test Image Captioning\n", "\n", "In this test, we will update sample image and ask the model to describe the objects in the image." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 6.2.1 Test the model with a image of cat\n", "\n", "Upload a sample image of a cat with a mask to the test S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "file_name=\"cat.png\"\n", "s3_client.upload_file(file_name, bucket_name, file_name)\n", "\n", "from PIL import Image\n", "#display the image\n", "Image.open(file_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Invoke the sagemaker endpoint with the image file name and retrive the output of the model" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "%%time\n", "import json\n", "content_type = \"application/json\"\n", "request_body = {\n", " \"ofa_task\":\"OFA_TASK_IMAGE_CAPTION\",\n", " \"bucket_name\" : bucket_name,\n", " \"key_name\" : file_name\n", "}\n", "\n", "payload = json.dumps(request_body)\n", "print(payload)\n", "\n", "#Endpoint invocation\n", "response = runtime_sm_client.invoke_endpoint(\n", " EndpointName=endpoint_name,\n", " ContentType=content_type,\n", " Body=payload)\n", "\n", "#Parse results\n", "result = json.loads(response['Body'].read().decode())\n", "print (result)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## 7. Clean up resources\n", "Use this section to delete any of the resources we have deployed using this notebook. Don't forget to shut off the instance running this notebook when you are done! You may also want to head over to S3 and clear out your image bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [ "sm_client = boto3.client(service_name='sagemaker')\n", "sm_client.delete_model(ModelName=model_name)\n", "sm_client.delete_endpoint_config(EndpointConfigName=endpoint_config_name)\n", "sm_client.delete_endpoint(EndpointName=endpoint_name)" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "pycharm": { "name": "#%%\n" } }, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_pytorch_p38", "language": "python", "name": "conda_pytorch_p38" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.12" } }, "nbformat": 4, "nbformat_minor": 1 }