### Improve high value research with Hugging Face and Amazon SageMaker asynchronous endpoints

**Table of Contents**

* [Background](#background)
* [Architecture](#overview)
* [Section 1 - Setup](#setup) 
 * [Create Model](#createmodel)
 * [Create EndpointConfig](#endpoint-config)
 * [Create Endpoint](#create-endpoint)
* [Section 2 - Using the Endpoint](#endpoint) 
 * [Invoke Endpoint](#invoke-endpoint)
* [Section 3 - Clean up](#clean)

### Background 
Amazon SageMaker Asynchronous Inference is a new capability in SageMaker that queues incoming requests and processes them asynchronously.
SageMaker currently offers 3 inference options for customers to deploy machine learning models:
1. Real-time option for low-latency workloads
2. Batch transform, an offline option to process inference requests on batches of data available upfront.
3. Asynchornous Inference

Real-time inference is suited for workloads with payload sizes of less than 6 MB and require inference requests to be processed within 60 seconds. Batch transform is suitable for offline inference on batches of data. 

Asynchronous inference is a new inference option for near real-time inference needs. Requests can take up to 15 minutes to process and have payload sizes of up to 1 GB. Asynchronous inference is suitable for workloads that do not have sub-second latency requirements and have relaxed latency requirements. For example, you might need to process an inference on a large image of several MBs within 5 minutes. In addition, asynchronous inference endpoints let you control costs by scaling down endpoints instance count to zero when they are idle, so you only pay when your endpoints are processing requests. 


### Architecture 
Asynchronous inference endpoints have many similarities (and some key differences) compared to real-time endpoints. The process to create asynchronous endpoints is similar to real-time endpoints. You will need to create: a model, an endpoint configuration, and then an endpoint. However, there are specific configuration parameters specific to asynchronous inference endpoints which we will explore below. 

Invocation of asynchronous endpoints differ from real-time endpoints. Rather than pass request payload inline with the request, you upload the payload to Amazon S3 and pass an Amazon S3 URI as a part of the request. Upon receiving the request, SageMaker provides you with a token with the output location where the result will be placed once processed. Internally, SageMaker maintains a queue with these requests and processes them. During endpoint creation, you can optionally specify an Amazon SNS topic to receive success or error notifications. Once you receive the notification that your inference request has been successfully processed, you can access the result in the output Amazon S3 location. 

In this example: 

* We will be deploying a pretrained Huggeging Face model to [SageMaker hosting services](https://docs.aws.amazon.com/sagemaker/latest/dg/deploy-model.html). This will automatically provision an asynchronous endpoint that host your model, from which you can get predictions in near real time.
* We demonstrate the new capabilities of an internal queue with user-defined concurrency and completion notifications. We configure autoscaling of instances to scale down to 0 when traffic subsides and scales back up as the request queue fills up. 
* We also use [Amazon CloudWatch](https://aws.amazon.com/cloudwatch/) metrics to monitor the queue size, total processing time, and invocations processed. 





1. Our pre-trained PEGASUS (https://huggingface.co/google/pegasus-large) ML model is first hosted on the scaling endpoint.
2. The user or some other mechanism uploads the article to be summarized to an input S3 bucket.
3. The user or some other mechanism invokes the endpoint and is immediately returned an output Amazon S3 location where the inference is written.
4. After the inference is complete, the result is saved to the output S3 bucket.
5. An Amazon [Simple Notification Service](https://aws.amazon.com/sns/?whats-new-cards.sort-by=item.additionalFields.postDateTime&whats-new-cards.sort-order=desc) (SNS) notification is sent to the user notifying them of the completed success or failure.


---
## 1. Setup 

First we ensure we have an updated version of Sagemaker, which includes the latest SageMaker features:

Import the required python libraries:

In [None]:
!python -m pip install --upgrade pip --quiet
!pip install -U awscli --quiet
!pip install --upgrade sagemaker --quiet

In [None]:
from time import gmtime, strftime
from sagemaker import image_uris
import sagemaker
import logging
import boto3
import json
import urllib
import boto3
import datetime
import time
import json
import os
import sys
import io

In [None]:
region = sagemaker.Session().boto_region_name
role = sagemaker.get_execution_role()
boto3.setup_default_session(region_name=region)
boto_session = boto3.Session(region_name=region)
sm_session = sagemaker.session.Session()
sm_client = boto_session.client('sagemaker')
sm_runtime = boto_session.client("sagemaker-runtime")
s3_bucket = sm_session.default_bucket()
sns_client = boto3.client('sns')
print(f'Region = {region}')
print(f'Role = {role}')
s3_bucket = sm_session.default_bucket()
print(f"We will use S3 bucket : '{s3_bucket}' for storing all resources related to this notebook")
bucket_prefix = "async-inference-demo"

Specify your IAM role. Go the AWS [IAM console](https://console.aws.amazon.com/iam/home) and add the following policies to your IAM Role:
* SageMakerFullAccessPolicy
* Amazon S3 access: Apply this to get and put objects in your Amazon S3 bucket. Replace `bucket_name` with the name of your Amazon S3 bucket: 

```json
{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Action": [
 "s3:GetObject",
 "s3:PutObject",
 "s3:AbortMultipartUpload",
 "s3:ListBucket"
 ],
 "Effect": "Allow",
 "Resource": "arn:aws:s3:::/*"
 }
 ]
}
```

* (Optional) Amazon SNS access: Add `sns:Publish` on the topics you define. Apply this if you plan to use Amazon SNS to receive notifications.

```json
{
 "Version": "2012-10-17",
 "Statement": [
 {
 "Action": [
 "sns:Publish"
 ],
 "Effect": "Allow",
 "Resource": "arn:aws:sns:us-east-2:123456789012:MyTopic"
 }
 ]
}
```

* (Optional) KMS decrypt, encrypt if your Amazon S3 bucket is encrypted.

Specify your SageMaker IAM Role (`role`) and Amazon S3 bucket . You can optionally use a default SageMaker Session IAM Role and Amazon S3 bucket. Make sure the role you use has the necessary permissions for SageMaker, Amazon S3, and optionally Amazon SNS.

### 1.1 Create Model 
Specify the location of the pre-trained model stored in Amazon S3. We will be using the [PEGASUS](https://huggingface.co/google/pegasus-large) model for the purpose of this blog. We will use the model as is from HuggingFace for simplicity purpose. But if you would like to fine tune the model based on custom dataset, you can do so by following [this](https://aws.amazon.com/blogs/machine-learning/fine-tune-and-host-hugging-face-bert-models-on-amazon-sagemaker/) blog. Please feel free to try out other sequence-to-sequence models available in the [HuggingFace Model Hub](https://huggingface.co/models?pipeline_tag=summarization&sort=downloads). The full Amazon S3 URI is stored in a string variable `MODEL_DATA_URL`. 

In [None]:
model_s3_key = f"{bucket_prefix}/summarization-model.tar.gz"
with open("model/summarization-model.tar.gz", "rb") as model_file:
 boto_session.resource("s3").Bucket(s3_bucket).Object(model_s3_key).upload_fileobj(model_file)
print("Uploaded the model to S3 bucket")

In [None]:
model_url = f"s3://{s3_bucket}/{model_s3_key}"
print(model_url)

Specify a primary container. For the primary container, you specify the Docker image that contains inference code, artifacts (from prior training), and a custom environment map that the inference code uses when you deploy the model for predictions. In this example, we retrieve the appropriate container image by specifying the right framework version and framework details. Here in this case we are downloading container image associated with Hugging Face framework. For further details on right container images to use for your use case please refer to this link https://github.com/awsdocs/amazon-sagemaker-developer-guide/blob/master/doc_source/ and look in to appropriate ecr folder pertaining to the region of your interest

In [None]:
ecr_image = image_uris.retrieve(framework='huggingface', 
 region=region, 
 version='4.6.1', 
 image_scope='inference', 
 base_framework_version='pytorch1.7.1', 
 py_version='py36', 
 container_version='ubuntu18.04', 
 instance_type='ml.m5.xlarge')
print(f"ECR Image:{ecr_image}")

In [None]:
model_name = f"Pegasus-summarization-async-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"

Create a model by specifying the `ModelName`, the `ExecutionRoleARN` (the ARN of the IAM role that Amazon SageMaker can assume to access model artifacts/ docker images for deployment), and the `PrimaryContainer`.

In [None]:
response = sm_client.create_model(ModelName=model_name, 
 ExecutionRoleArn=role, 
 PrimaryContainer={'Image': ecr_image, 
 'ModelDataUrl': model_url,
 'Environment':{
 'HF_MODEL_ID':'google/pegasus-large',
 'HF_TASK':'summarization',
 'SAGEMAKER_CONTAINER_LOG_LEVEL':'20',
 'SAGEMAKER_REGION':region
 }
 }
 )
model_arn = response['ModelArn']

print(f'Created Model: {model_arn}')

In [None]:
endpoint_config_name = f"Pegasus-summarization-async-config-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"

print(endpoint_config_name)

Create Error and Success SNS topics

In [None]:
response = sns_client.create_topic(Name="Async-ErrorTopic")
error_topic= response['TopicArn']
print(error_topic)

In [None]:
response = sns_client.create_topic(Name="Async-SuccessTopic")
success_topic = response['TopicArn']
print(success_topic)

Optionally Subscribe to an SNS topic

In [None]:
##Note: Replace with your email id

# email_id = 'youremail@domain.com'
# email_sub_1 = sns_client.subscribe(
# TopicArn=success_topic,
# Protocol='email',
# Endpoint=email_id)

# email_sub_2 = sns_client.subscribe(
# TopicArn=error_topic,
# Protocol='email',
# Endpoint=email_id)

##Note: You will need to confirm by clicking on the email you recieve to complete the subscription

### 1.2 Create EndpointConfig 

Once you have a model, create an endpoint configuration with CreateEndpointConfig. Amazon SageMaker hosting services uses this configuration to deploy models. In the configuration, you identify one or more models that were created using with CreateModel API, to deploy the resources that you want Amazon SageMaker to provision. Specify the AsyncInferenceConfig object and provide an output Amazon S3 location for OutputConfig. You can optionally specify Amazon SNS topics on which to send notifications about prediction results.

In [None]:
response = sm_client.create_endpoint_config(
 EndpointConfigName=endpoint_config_name,
 ProductionVariants=[
 {
 "VariantName": "variant1",
 "ModelName": model_name,
 "InstanceType": "ml.m5.xlarge",
 "InitialInstanceCount": 1
 }
 ],
 AsyncInferenceConfig={
 "OutputConfig": {
 "S3OutputPath": f"s3://{s3_bucket}/{bucket_prefix}/output",
 # Optionally specify Amazon SNS topics
 "NotificationConfig": {
 "SuccessTopic": success_topic,
 "ErrorTopic": error_topic,
 }
 },
 "ClientConfig": {
 "MaxConcurrentInvocationsPerInstance": 2
 }
 }
)
endpoint_config_arn = response['EndpointConfigArn']
print(f"Created EndpointConfig: {endpoint_config_arn}")

### 1.3 Create Asynchronous Endpoint 

Unlike real time hosted endpoints, asynchronous endpoints support scaling down instances to 0 by setting the minimum capacity to 0. With this feature, we can scale down to 0 instances when there is no traffic and pay only when the payloads arrive. Let's create an asynchronous endpoint to see it in action below -

In [None]:
endpoint_name = f"async-summarization-inference-huggingface-{strftime('%Y-%m-%d-%H-%M-%S', gmtime())}"

response = sm_client.create_endpoint(EndpointName=endpoint_name, 
 EndpointConfigName=endpoint_config_name)

print(f'Creating Endpoint: {endpoint_name}')

In [None]:
waiter = sm_client.get_waiter("endpoint_in_service")
print("Waiting for endpoint to create...")
waiter.wait(EndpointName=endpoint_name)
resp = sm_client.describe_endpoint(EndpointName=endpoint_name)
print(f"Endpoint Status: {resp['EndpointStatus']}")

--- 
## 2. Using the Asynchronous Endpoint 

### 2.1 Uploading the Request Payload 

input data is placed in the input location in .json format

In [None]:
input_s3_key = f"{bucket_prefix}/input/input.json"
with open("data/input.json", "rb") as input_file:
 boto_session.resource("s3").Bucket(s3_bucket).Object(input_s3_key).upload_fileobj(input_file)
print("Uploaded the input data to S3 bucket")

In [None]:
input_s3_location = f"s3://{s3_bucket}/{bucket_prefix}/input/input.json"
print(input_s3_location)

### 2.1 Invoke Endpoint 

Get inferences from the model hosted at your asynchronous endpoint with InvokeEndpointAsync. Specify the location of your inference data in the InputLocation field and the name of your endpoint for EndpointName. The response payload contains the output Amazon S3 location where the result will be placed.

In [None]:
response = sm_runtime.invoke_endpoint_async(
 EndpointName=endpoint_name, InputLocation=input_s3_location,ContentType="application/json"
)
print(response)
output_location = response['OutputLocation']
print(f"OutputLocation: {output_location}")

In [None]:
from botocore.exceptions import ClientError

def get_output(output_location):
 output_url = urllib.parse.urlparse(output_location)
 bucket = output_url.netloc
 key = output_url.path[1:]
 while True:
 try:
 return sm_session.read_s3_file(bucket=output_url.netloc, key_prefix=output_url.path[1:])
 except ClientError as e:
 if e.response['Error']['Code'] == 'NoSuchKey':
 print("waiting for output...")
 time.sleep(2)
 continue
 raise 

In [None]:
#Printing the summarized output

output = get_output(output_location)
print(f"Sumarrized text is: {((output))}")

## Trigger 10 asynchronous requests on a single instance

In [None]:
inferences = []
for i in range(1,10):
 start = time.time()
 response = sm_runtime.invoke_endpoint_async(
 EndpointName=endpoint_name, 
 InputLocation=input_s3_location,
 ContentType="application/json" )
 
 output_location = response["OutputLocation"]
 inferences += [(input_s3_location, output_location)]

print("\Async invocations for Pytorch serving default: \n")

for input_file, output_location in inferences:
 output = get_output(output_location)
 print(f"Input File: {input_file}, Output location: {output_location}")

## Enable autoscaling 

In [None]:
client = boto3.client('application-autoscaling') # Common class representing Application Auto Scaling for SageMaker amongst other services

resource_id='endpoint/' + endpoint_name + '/variant/' + 'variant1' # This is the format in which application autoscaling references the endpoint

response = client.register_scalable_target(
 ServiceNamespace='sagemaker', 
 ResourceId=resource_id,
 ScalableDimension='sagemaker:variant:DesiredInstanceCount',
 MinCapacity=0, 
 MaxCapacity=5
)

response = client.put_scaling_policy(
 PolicyName='Invocations-ScalingPolicy',
 ServiceNamespace='sagemaker', # The namespace of the AWS service that provides the resource. 
 ResourceId=resource_id, # Endpoint name 
 ScalableDimension='sagemaker:variant:DesiredInstanceCount', # SageMaker supports only Instance Count
 PolicyType='TargetTrackingScaling', # 'StepScaling'|'TargetTrackingScaling'
 TargetTrackingScalingPolicyConfiguration={
 'TargetValue': 5.0, # The target value for the metric. 
 'CustomizedMetricSpecification': {
 'MetricName': 'ApproximateBacklogSizePerInstance',
 'Namespace': 'AWS/SageMaker',
 'Dimensions': [
 {'Name': 'EndpointName', 'Value': endpoint_name }
 ],
 'Statistic': 'Average',
 },
 'ScaleInCooldown': 120, # The cooldown period helps you prevent your Auto Scaling group from launching or terminating 
 # additional instances before the effects of previous activities are visible. 
 # You can configure the length of time based on your instance startup time or other application needs.
 # ScaleInCooldown - The amount of time, in seconds, after a scale in activity completes before another scale in activity can start. 
 'ScaleOutCooldown': 120 # ScaleOutCooldown - The amount of time, in seconds, after a scale out activity completes before another scale out activity can start.
 
 #'DisableScaleIn': True # Indicates whether scale in by the target tracking policy is disabled. 
 # If the value is true , scale in is disabled and the target tracking policy won't remove capacity from the scalable resource.
 }
)

## Trigger 1000 asynchronous invocations with autoscaling from 1 to 5 and then scale down to 0 on completion

Optionally unsubscribe or [delete the SNS topic](https://boto3.amazonaws.com/v1/documentation/api/1.9.42/reference/services/sns.html#SNS.Client.delete_topic) to avoid flooding of notifications on 1000 invocations below

In [None]:
inferences = []
for i in range(1,1000):
 start = time.time()
 response = sm_runtime.invoke_endpoint_async(
 EndpointName=endpoint_name, 
 InputLocation=input_s3_location,
 ContentType="application/json" )
 
 output_location = response["OutputLocation"]
 inferences += [(input_s3_location, output_location)]

print("\Async invocations for Pytorch serving default: \n")

for input_file, output_location in inferences:
 output = get_output(output_location)
 print(f"Input File: {input_file}, Output location: {output_location}")

Plot graphs from CloudWatch Metrics

In [None]:
import pandas as pd
import datetime
from datetime import datetime,timedelta
cw = boto3.Session().client("cloudwatch")

def get_sagemaker_metrics(endpoint_name,
 endpoint_config_name,
 variant_name,
 metric_name,
 statistic,
 start_time,
 end_time):
 dimensions = [
 {
 "Name": "EndpointName",
 "Value": endpoint_name
 },
 {
 "Name": "VariantName",
 "Value": variant_name
 }
 ]
 if endpoint_config_name is not None:
 dimensions.append({
 "Name": "EndpointConfigName",
 "Value": endpoint_config_name
 })
 metrics = cw.get_metric_statistics(
 Namespace="AWS/SageMaker",
 MetricName=metric_name,
 StartTime=start_time,
 EndTime=end_time,
 Period=120,
 Statistics=[statistic],
 Dimensions=dimensions
 )
 rename = endpoint_config_name if endpoint_config_name is not None else 'ALL'
 return pd.DataFrame(metrics["Datapoints"])\
 .sort_values("Timestamp")\
 .set_index("Timestamp")\
 .drop(["Unit"], axis=1)\
 .rename(columns={statistic: rename})

def plot_endpoint_model_latency_metrics(endpoint_name, endpoint_config_name, variant_name, start_time=None):
 start_time = start_time or datetime.now() - timedelta(minutes=120)
 end_time = datetime.now()
 metric_name = "ModelLatency"
 statistic = "Average"
 metrics_variants = get_sagemaker_metrics(
 endpoint_name,
 endpoint_config_name,
 variant_name,
 metric_name, 
 statistic,
 start_time,
 end_time)
 metrics_variants.plot(title=f"{metric_name}-{statistic}")
 return metrics_variants

In [None]:
model_latency_metrics = plot_endpoint_model_latency_metrics(endpoint_name, None, "variant1")

Similarly, we plot other Cloud Watch Metrics associated with the Endpoint as shown below - 

In [None]:
The instances autoscale bumps up to 5(Maxiumum configured) when the 

The instances autoscale down to 0 once the queue size goes down to 0

### 3. Summary & Clean up 

To Summarize, In this notebook we learned how to use the SageMaker Asynchronous inference capability with pre-trained Hugging Face models.

If you enabled auto-scaling for your endpoint, ensure you deregister the endpoint as a scalable target before deleting the endpoint. To do this, run the following:

In [9]:
# response = sm_client.deregister_scalable_target(
# ServiceNamespace='sagemaker',
# ResourceId='resource_id',
# ScalableDimension='sagemaker:variant:DesiredInstanceCount'
# )

Remember to delete your endpoint after use as you will be charged for the instances used in this Demo. 

In [None]:
sm_client.delete_endpoint(EndpointName=endpoint_name)

You may also want to delete any other resources you might have created such as SNS topics, S3 objects, etc.