# Using Async Inference and Jumpstart models on SageMaker to power pre-labeling workflows

1. [Set Up](#1.-Set-Up)
2. [Run inference on the pre-trained model](#2.-Run-inference-on-the-pre-trained-model)
 * [Retrieve JumpStart Artifacts & Deploy an Endpoint](#2.1.-Retrieve-model-artifacts-&-Deploy-to-an-endpoint)
 * [Download & process images for annotations](#2.2.-Download-images-for-annotations)
 * [Single prediction example](#2.3.-Single-prediction-example)
 * [Display model predictions](#2.4.-Display-model-predictions)
 * [Send images to Async endpoint](#2.5.-Send-images-to-Async-endpoint)
3. [Convert the model output annotations to SageMaker GroudTruth format](#3.0.-Convert-the-model-output-annotations-to-SageMaker-GroudTruth-format)
4. [Create Boundingbox Verification Job](#4.0.-Create-Boundingbox-verification-job)
 * [Execute API call to create the job](#4.1.-Execute-API-call-to-create-the-job)
 * [Complete Verification](#4.2.-Complete-verification) 
5. [Clean up the endpoint](#5.0.-Clean-up-the-endpoint)

Note: This notebook was tested on ml.t3.medium instance in Amazon SageMaker Studio with Python 3 (Data Science) kernel and in Amazon SageMaker Notebook instance with conda_python3 kernel.

### 1. Set Up

In [None]:
!pip install sagemaker --upgrade
!pip install awswrangler 

In [None]:
%load_ext autoreload
%autoreload 2

#### Permissions and environment variables

---
To train and host on Amazon SageMaker, we need to set up and authenticate the use of AWS services. Here, we use the execution role associated with the current notebook as the AWS account role with SageMaker access. It has necessary permissions, including access to your data in S3. 

In [None]:
import sagemaker, boto3, json
import awswrangler as wr
from sagemaker import get_execution_role
from utils import *


aws_role = get_execution_role()
aws_region = boto3.Session().region_name
sess = sagemaker.Session()

## 2. Run inference on the pre-trained model

***
Using JumpStart, we can perform inference on the pre-trained model, even without fine-tuning it first on a new dataset.

### 2.1. Retrieve model artifacts & Deploy to an endpoint

There are three options to perform inference on an existing pre-trained model

* Option A- Create model from SageMaker Jumpstart. Using JumpStart, we can perform inference on the pre-trained model, even without fine-tuning it first on a new dataset. 

* Option B- Use a model shared with your team or organization. You can use this option if you want to use a model developed by one of the teams within your organization (e.g. Perception).

* Option C- Use an existing endpoint. You can use this option if you have an existing model already deployed in your account. 

Following sections provide details

#### Option A: Create model from SageMaker Jumpstart and deploy to an endpoint
***

Here, we download jumpstart model_manifest file from the jumpstart s3 bucket, filter-out all the Instance Segmentation models and select a model for inference. We retrieve the `deploy_image_uri`, `deploy_source_uri`, and `base_model_uri` for the pre-trained model. To host the pre-trained base-model, we create an instance of [`sagemaker.model.Model`](https://sagemaker.readthedocs.io/en/stable/api/inference/model.html) and deploy it. 

This has following steps

* Create the model
* Async Inference set up & Deploy model
* Setup Auto scaling

If you have already created an endpoint and want to use it, proceed to Option C.

##### Create the model

In [None]:
from ipywidgets import Dropdown

# download JumpStart model_manifest file.
boto3.client("s3").download_file(
 f"jumpstart-cache-prod-{aws_region}", "models_manifest.json", "models_manifest.json"
)
with open("models_manifest.json", "rb") as json_file:
 model_list = json.load(json_file)

# filter-out all the Instance Segmentation models from the manifest list.
is_models = []
for model in model_list:
 model_id = model["model_id"]
 if "-is-" in model_id and model_id not in is_models:
 is_models.append(model_id)

is_models

From the above list of models, pick an instance segementation model for pre-labeling task

NOTE: model_version="*" fetches the latest version of the model

In [None]:
model_id = 'mxnet-is-mask-rcnn-fpn-resnet101-v1d-coco'
model_version = "*"

In [None]:
from sagemaker import image_uris, model_uris, script_uris, hyperparameters
from sagemaker.model import Model
from sagemaker.predictor import Predictor
from sagemaker.utils import name_from_base

endpoint_name = name_from_base(f"jumpstart-example-infer-{model_id}")
inference_instance_type = "ml.p3.2xlarge"

# Retrieve the inference docker container uri
deploy_image_uri = image_uris.retrieve(
 region=None,
 framework=None, # automatically inferred from model_id
 image_scope="inference",
 model_id=model_id,
 model_version=model_version,
 instance_type=inference_instance_type,
)

# Retrieve the inference script uri. This includes scripts for model loading, inference handling etc.
deploy_source_uri = script_uris.retrieve(
 model_id=model_id, model_version=model_version, script_scope="inference"
)


# Retrieve the base model uri
base_model_uri = model_uris.retrieve(
 model_id=model_id, model_version=model_version, model_scope="inference"
)

# Create the SageMaker model instance
model = Model(
 image_uri=deploy_image_uri,
 source_dir=deploy_source_uri,
 model_data=base_model_uri,
 entry_point="inference.py", # entry point file in source_dir and present in deploy_source_uri
 role=aws_role,
 predictor_cls=Predictor,
 name=endpoint_name,
)

print(f'Model endpoint is {endpoint_name}')

##### Async Inference set up & Deploy model
---

In [None]:
from sagemaker.async_inference.async_inference_config import AsyncInferenceConfig

async_config = AsyncInferenceConfig(
 output_path=f"s3://{sess.default_bucket()}/asyncinference/output",
 max_concurrent_invocations_per_instance=4,
 # Optionally specify Amazon SNS topics
 # notification_config = {
 # "SuccessTopic": "arn:aws:sns:::",
 # "ErrorTopic": "arn:aws:sns:::",
 # }
)

base_model_predictor = model.deploy(
 async_inference_config=async_config,
 instance_type=inference_instance_type,
 initial_instance_count=1,
 predictor_cls=Predictor,
 endpoint_name=endpoint_name
)

##### Set up Autoscaling
---

First register your endpoint variant with Application Autoscaling, define a scaling policy, and then apply the scaling policy. In this configuration, we use a custom metric, `CustomizedMetricSpecification`, called `ApproximateBacklogSizePerInstance`. Please refer to the SageMaker Developer guide for a detailed list of metrics available with your asynchronous inference endpoint.

In [None]:
client = boto3.client(
 "application-autoscaling"
) # Common class representing Application Auto Scaling for SageMaker amongst other services

resource_id = (
 "endpoint/" + endpoint_name + "/variant/" + "AllTraffic"
) # This is the format in which application autoscaling references the endpoint

# Configure Autoscaling on asynchronous endpoint down to zero instances
response = client.register_scalable_target(
 ServiceNamespace="sagemaker",
 ResourceId=resource_id,
 ScalableDimension="sagemaker:variant:DesiredInstanceCount",
 MinCapacity=1, # Note that this endpoint can autoscale down to zero!
 MaxCapacity=5,
)

response = client.put_scaling_policy(
 PolicyName="Invocations-ScalingPolicy",
 ServiceNamespace="sagemaker", # The namespace of the AWS service that provides the resource.
 ResourceId=resource_id, # Endpoint name
 ScalableDimension="sagemaker:variant:DesiredInstanceCount", # SageMaker supports only Instance Count
 PolicyType="TargetTrackingScaling", # 'StepScaling'|'TargetTrackingScaling'
 TargetTrackingScalingPolicyConfiguration={
 "TargetValue": 5.0, # The target value for the metric. - here the metric is - SageMakerVariantInvocationsPerInstance
 "CustomizedMetricSpecification": {
 "MetricName": "ApproximateBacklogSizePerInstance",
 "Namespace": "AWS/SageMaker",
 "Dimensions": [{"Name": "EndpointName", "Value": endpoint_name}],
 "Statistic": "Average",
 },
 "ScaleInCooldown": 300, 
 "ScaleOutCooldown": 300 
 },
)

#### Option B: Use a model shared with your team or organization
***
You can also use a model that is developed by another team (e.g. Perception) and shared with your team. Modles in an organization can be shared among the teams via SageMaker Jumpstart. Following screenshots provide some examples. For more details on how to share a Jumpstart model visit the [documentation page.](https://docs.aws.amazon.com/sagemaker/latest/dg/jumpstart-content-sharing.html)


* You can find the models shared from SageMaker Studio Jumpstart page. You can access it from Home menu. 

![Jumpstart-shared-discover.png]()


"Jumpstart


* You can choose to Deploy the shared model from the model details page. Review the configurations and click Deploy.

"Deploy

* Deployment takes few minutes and when it is complete you will see the Endpoint status changes to "In Service". This page will also give you the ARN of the deployed endpoint. You can copy the ARN and move to next step. 

"Endpoint



In [None]:
from sagemaker.predictor import Predictor
endpoint_name = # Copy the endpoint here
base_model_predictor = sagemaker.predictor_async.AsyncPredictor(predictor = Predictor(session=sess,endpoint_name= endpoint_name))

#### Option C: Use an existing endpoint
***
If we already have the model deployed, we can get the Predictor with the endpoint name. If not go to Option A or B from the above section to deploy the model.

In [None]:
from sagemaker.predictor import Predictor
endpoint_name = 'jumpstart-example-infer-mxnet-is-mask-r-2023-02-27-21-19-55-096'
base_model_predictor = sagemaker.predictor_async.AsyncPredictor(predictor = Predictor(session=sess,endpoint_name= endpoint_name))

### 2.2. Download images for annotations 
---
In this step, we download images that need to be annotated. We download Ford multi AV sesonal dataset for this notebook. You can use your images that need to be labaled. We will use the Jumpstart model to label the images. 

* Reference: https://arxiv.org/abs/2003.07969
* Ford Multi-AV Seasonal Dataset was accessed on DATE from https://registry.opendata.aws/ford-multi-av-seasonal

In [None]:
%%time
!aws s3 cp --no-sign-request s3://ford-multi-av-seasonal/2018-04-17/V2/Log1/2018-04-17-V2-Log1-FL.tar.gz ./data

In [None]:
%%time
!rm -r /data/images
!mkdir /data/images
!rm -r /data/processedimages
!mkdir /data/processedimages
!rm -r /data/predictions
!mkdir /data/predictions
!rm -r /data/segmentation
!mkdir /data/segmentation

!tar -xzf /data/2018-04-17-V2-Log1-FL.tar.gz -C /data/images --no-same-owner

#### Start resize job locally. 

This uses mogrify to resize the images in batch. 

Optionally you can speed up the resizing with GNU parallel. If you want to proceed with it, install GNU Parallel library. This can be installed by running the following command in SageMaker image terminal. 

```
conda install -c conda-forge parallel
```


In [None]:
#%%time
#!mogrify -path data/processedimages -format png -resize 559x536! data/images/*.png 

In [None]:
%%time
!ls data/images/*.png | parallel --jobs 3 mogrify -path data/processedimages -format png -resize 559x536! {} 

### 2.3. Single prediction example

Note that it may take some time for the inference outputs to be written back to S3 from an Async endpoint

In [None]:
!ls data/processedimages | tail -5

In [None]:
input_1_location = 'data/processedimages/1523946753799396.png'
input_1_s3_location = upload_image(sess,input_1_location,sess.default_bucket())

In [None]:
async_response = base_model_predictor.predict_async(input_path=input_1_s3_location)
output_location = async_response.output_path
print(f'Output path for single prediction is {output_location}')

In [None]:
#Wait until the object is available in S3
wr.s3.wait_objects_exist([output_location])

In [None]:
#Copy object locally
!aws s3 cp {output_location} data/single.out

### 2.4. Display model predictions
---
Next, we to plot the boxes on top of image with masks. For this, we adopt a similar function from [GluonCV](https://cv.gluon.ai/_modules/gluoncv/utils/viz/bbox.html#plot_bbox)

In [None]:
plot_response('data/single.out')

### 2.5. Send images to Async endpoint

In the next step we send a batch of images to the Async endpoint. You can control how many images you want to send to the endpoint by changing the variable max_images

---

In [None]:
import glob
import time

max_images = 10
input_locations,output_locations, = [], []

for i, file in enumerate(glob.glob("data/processedimages/*.png")):
 input_1_s3_location = upload_image(sess,file,sess.default_bucket())
 input_locations.append(input_1_s3_location)
 async_response = base_model_predictor.predict_async(input_path=input_1_s3_location)
 output_locations.append(async_response.output_path)
 if i > max_images:
 break

In [None]:
#Wait for objects to be available in S3
wr.s3.wait_objects_exist(output_locations,delay=5,max_attempts=2*max_images)

### 3.0. Convert the model output annotations to SageMaker GroudTruth format
---
Next, we take the output from the Jumpstart model and convert the annotations to SageMaker Groundtruth format.

In [None]:
image_bucket=sess.default_bucket()
image_prefix = "asyncinference/images"
manifest_file_name="annotations.manifest"
convert_to_sm_gt_manifest(output_locations,image_bucket,image_prefix,manifest_file_name)

Upload the manifest file to S3

---
Next we will upload the generated manifest file to S3 which can be used for Bounding box & Label verification
Provide the s3 bucket where the manifest file needs to be uploaded in the below section

In [None]:
manifest_bucket = 'sm-gt-label-490491240736'
s3_manifest_file = upload_file(sess,manifest_file_name,manifest_bucket,'manifest')
print(f"Labeling manifest file uploaded to {s3_manifest_file}")

### 4.0. Create Boundingbox verification job
---
In this section, we will create a [bounding box verification job](https://docs.aws.amazon.com/sagemaker/latest/dg/sms-verification-data.html#sms-data-verify-start-api). We will upload the SageMaker Ground Truth UI Template, label categories file and create the verification job. This uses Private workforce to perform the labeling and you can change if you are using other types of workforce. For more details refer to CreateLabelingJob API [here.](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateLabelingJob.html)

#### 4.1. Execute API call to create the job

In [None]:
ui_template_file="instructions.template"
label_categories_file = "label_categories.json"
ui_template_uri= upload_file(sess,ui_template_file,manifest_bucket,'uitemplate')
label_caegories_json_uri= upload_file(sess,label_categories_file,manifest_bucket,'uitemplate')

print(f"UI template uploaded to {ui_template_uri}")
print(f"Label categories uploaded to {label_caegories_json_uri}")

---
In the below section, specify the parameters requried to start the verification job
* labeling_job_iam_role_arn - Speicify the IAM role ARN that will be assumed by the verification job
* private_workforce_arn - Specify the ARN of the Private workforce tha will perform the labeling
* pre_human_lambda - Lambda function that processes before the verification job is assigned. For more details refer to [documentation](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_HumanTaskConfig.html#SageMaker-Type-HumanTaskConfig-PreHumanTaskLambdaArn).
* annotation_consolidation_lambda - Lambda function that consolidates the annotations from multiple workers. For more details refer to [documentation](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_AnnotationConsolidationConfig.html#SageMaker-Type-AnnotationConsolidationConfig-AnnotationConsolidationLambdaArn)

In [None]:
labeling_job_name = 'async-jumpstart-bbox-labeling-job-' + str(int(time.time()))
manifest_label_name = 'prelabel'
labeling_job_iam_role_arn='arn:aws:iam::490491240736:role/service-role/AmazonSageMaker-ExecutionRole-20230222T140752'
private_workforce_arn = 'arn:aws:sagemaker:us-west-2:490491240736:workteam/private-crowd/inhouse-team'
label_output = f's3://{manifest_bucket}/gtoutput'
arn_region_map = {
 "us-west-2": "081040173940",
 "us-east-1": "432418664414",
 "us-east-2": "266458841044",
 "eu-west-1": "568282634449",
 "eu-west-2": "487402164563",
 "ap-northeast-1": "477331159723",
 "ap-northeast-2": "845288260483",
 "ca-central-1": "918755190332",
 "eu-central-1": "203001061592",
 "ap-south-1": "565803892007",
 "ap-southeast-1": "377565633583",
 "ap-southeast-2": "454466003867",
}

#Provide Lambda function for preprocessing
#Ref: https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_HumanTaskConfig.html#SageMaker-Type-HumanTaskConfig-PreHumanTaskLambdaArn
pre_human_lambda = f'arn:aws:lambda:{aws_region}:{arn_region_map[aws_region]}:function:PRE-AdjustmentBoundingBox'

#Provide Lambda function for annotation consolidation
#Ref: https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_AnnotationConsolidationConfig.html#SageMaker-Type-AnnotationConsolidationConfig-AnnotationConsolidationLambdaArn
annotation_consolidation_lambda = f'arn:aws:lambda:{aws_region}:{arn_region_map[aws_region]}:function:ACS-AdjustmentBoundingBox'

In [None]:
sagemaker_client = boto3.client("sagemaker")
#Create the labeling job 
response = sagemaker_client.create_labeling_job(
 LabelingJobName=labeling_job_name,
 LabelAttributeName=manifest_label_name,
 InputConfig={
 'DataSource': {
 'S3DataSource': {
 'ManifestS3Uri': s3_manifest_file
 }
 },
 'DataAttributes': {
 'ContentClassifiers': [
 'FreeOfPersonallyIdentifiableInformation','FreeOfAdultContent',
 ]
 }
 },
 OutputConfig={
 'S3OutputPath': label_output,
 #'KmsKeyId': 'string' If you want to encrypt the output provide KMS key here
 },
 RoleArn=labeling_job_iam_role_arn,
 LabelCategoryConfigS3Uri=label_caegories_json_uri,
 StoppingConditions={
 'MaxHumanLabeledObjectCount': 123,
 'MaxPercentageOfInputDatasetLabeled': 100
 },
 HumanTaskConfig={
 'WorkteamArn': private_workforce_arn,
 'UiConfig': {
 'UiTemplateS3Uri': ui_template_uri
 },
 'PreHumanTaskLambdaArn': pre_human_lambda,
 'TaskKeywords': [
 'Bounding Box',
 ],
 'TaskTitle': 'Bounding Box task',
 'TaskDescription': 'Draw bounding boxes around objects in an image',
 'NumberOfHumanWorkersPerDataObject': 2,
 'TaskTimeLimitInSeconds': 3600,
 #'TaskAvailabilityLifetimeInSeconds': 1000,
 'MaxConcurrentTaskCount': 5,
 'AnnotationConsolidationConfig': {
 'AnnotationConsolidationLambdaArn': annotation_consolidation_lambda
 }
 }
)

In [None]:
if response['ResponseMetadata']['HTTPStatusCode'] == 200:
 print(f"Bounding box verification job started successfully. \nJob arn {response['LabelingJobArn']}")
else:
 print("Error with the verification job.Check the response below")
 print(response)

#### 4.2. Complete verification

In this step you will complete the verification by accessing the Labeling portal. For more details on this please refer to SageMaker GroundTruth documentation page [here.](https://docs.aws.amazon.com/sagemaker/latest/dg/sms-getting-started-step3.html)

When you access the portal as a workforce member, you will be able to see the bounding boxes created by the Jumpstart model and can make adjustemnts as requried. 

![GT-Verification.png](images/GT-Verification.png)

### 5.0. Clean up the endpoint
---
This step is optional. We clean up by deleting the endpoint, model configuration and removing any images processed locally

In [None]:
# Delete the SageMaker endpoint
base_model_predictor.delete_model()
base_model_predictor.delete_endpoint()

In [None]:
!rm -r data