# Workshop - Human in the Loop for SageMaker Models - Module 2

In this module, you will learn how you can use **Amazon A2I with an Amazon SageMaker Hosted Endpoint.**.

Amazon Augmented AI (Amazon A2I) makes it easy to build the workflows required for human review of ML predictions. Amazon A2I brings human review to all developers, removing the undifferentiated heavy lifting associated with building human review systems or managing large numbers of human reviewers. 

You can create your own workflows for ML models built on Amazon SageMaker or any other tools. Using Amazon A2I, you can allow human reviewers to step in when a model is unable to make a high confidence prediction or to audit its predictions on an on-going basis. 

## Setup

In [None]:
import os
import io
import uuid
import time
import random
import numpy as np
import json
import boto3
import botocore
import sagemaker
from sagemaker import get_execution_role
import matplotlib.pyplot as plt
import matplotlib.patches as patches    
import matplotlib.image as mpimg

We need to set up the following data:
* `region` - Region to call A2I.
* `BUCKET` - A S3 bucket accessible by the given role
    * Used to store the sample images & output results
    * Must be within the same region A2I is called from
* `role` - The IAM role used as part of StartHumanLoop. By default, this notebook will use the execution role
* `workteam` - Group of people to send the work to

In [None]:
region = 'us-east-1'
role = get_execution_role()
sess = sagemaker.Session()
BUCKET = sess.default_bucket()
OUTPUT_PATH = f's3://{BUCKET}/a2i-results'
MODEL_PATH = f's3://{BUCKET}/model'
WORKFORCE_ARN = '' # CREATE THE WORKFORCE BEFORE RUNNING THE NEXT CELLS
endpoint_name = 'DEMO-ObjectDetection-endpoint-pretrained'

## Load the Model and Create an Endpoint

For workshop purpose we are going to deploy a pre-trained model.

In [None]:
# download pre-trained model
source_model_data_s3_uri = 's3://aws-sagemaker-augmented-ai-example/model/model.tar.gz'

!aws s3 cp {source_model_data_s3_uri} {MODEL_PATH}/model.tar.gz

model_data_s3_uri = f'{MODEL_PATH}/model.tar.gz'

image = sagemaker.image_uris.retrieve('object-detection', region, version='1')

model = sagemaker.model.Model(image, 
                              model_data = model_data_s3_uri,
                              role = role,
                              predictor_cls = sagemaker.predictor.Predictor,
                              sagemaker_session = sess)

object_detector = model.deploy(initial_instance_count = 1,
                               instance_type = 'ml.m5.xlarge',
                               endpoint_name = endpoint_name,
                               serializer = sagemaker.serializers.IdentitySerializer('image/jpeg'))

## Test Endpoint

#### object detection helper functions

In [None]:
def visualize_detection(img_file, dets, classes=[], thresh=0.6):
    """
    visualize detections in one image
    Parameters:
    ----------
    img : numpy.array
        image, in bgr format
    dets : numpy.array
        ssd detections, numpy.array([[id, score, x1, y1, x2, y2]...])
        each row is one object
    classes : tuple or list of str
        class names
    thresh : float
        score threshold
    """
    img=mpimg.imread(img_file)
    f, ax = plt.subplots(1, 1)
    ax.imshow(img)
    height = img.shape[0]
    width = img.shape[1]
    colors = dict()
    output = []
    for det in dets:
        (klass, score, x0, y0, x1, y1) = det
        cls_id = int(klass)
        class_name = str(cls_id)
        if classes and len(classes) > cls_id:
            class_name = classes[cls_id]
        output.append([class_name, score])
        if score < thresh:
            continue
        if cls_id not in colors:
            colors[cls_id] = (random.random(), random.random(), random.random())
        xmin = int(x0 * width)
        ymin = int(y0 * height)
        xmax = int(x1 * width)
        ymax = int(y1 * height)
        rect = patches.Rectangle((xmin, ymin), xmax - xmin,
                                 ymax - ymin, fill=False,
                                 edgecolor=colors[cls_id],
                                 linewidth=3.5)
        ax.add_patch(rect)


        ax.text(xmin, ymin - 2,
                '{:s} {:.3f}'.format(class_name, score),
                bbox=dict(facecolor=colors[cls_id], alpha=0.5),
                          fontsize=12, color='white')

    return f, output
    
def load_and_predict(file_name, predictor, threshold=0.5):
    """
    load an image, make object detection to an predictor, and visualize detections
    Parameters:
    ----------
    file_name : str
        image file location, in str format
    predictor : sagemaker.predictor.RealTimePredictor
        a predictor loaded from hosted endpoint
    threshold : float
        score threshold for bounding box display
    """
    with open(file_name, 'rb') as image:
        f = image.read()
        b = bytearray(f)
    results = predictor.predict(b)
    detections = json.loads(results)
    
    fig, detection_filtered = visualize_detection(file_name, detections['prediction'], 
                                                   object_categories, threshold)
    
    return results, detection_filtered, fig

In [None]:
object_categories = ['aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 
                     'cat', 'chair', 'cow', 'diningtable', 'dog', 'horse', 'motorbike', 
                     'person', 'pottedplant', 'sheep', 'sofa', 'train', 'tvmonitor']

In [None]:
test_photos_index = ['980382', '276517', '1571457']

if not os.path.isdir('sample-a2i-images'):
    os.mkdir('sample-a2i-images')
    
for ind in test_photos_index:
    !curl https://images.pexels.com/photos/{ind}/pexels-photo-{ind}.jpeg > sample-a2i-images/pexels-photo-{ind}.jpeg

In [None]:
test_photos = ['sample-a2i-images/pexels-photo-980382.jpeg', # motorcycle
               'sample-a2i-images/pexels-photo-276517.jpeg', # bicycle
               'sample-a2i-images/pexels-photo-1571457.jpeg'] # sofa

In [None]:
results = load_and_predict(test_photos[2], object_detector, threshold=0.2)[0]

In [None]:
dict_results = json.loads(results.decode('utf8'))['prediction']

In [None]:
visualize_detection(test_photos[2], dict_results, object_categories, thresh=0.4)[0]

Probability of 0.465 is considered quite low in modern computer vision and there is a mislabeling. This is due to the fact that the SSD model was under-trained for demonstration purposes. However this under-trained model serves as a perfect example of brining human reviewers when a model is unable to make a high confidence prediction.

## Creating Human Review

In [None]:
sagemaker_client = boto3.client('sagemaker', region)
a2i = boto3.client('sagemaker-a2i-runtime')
s3 = boto3.client('s3', region)

# Flow definition name - this value is unique per account and region. You can also provide your own value here.
flowDefinitionName = 'fd-sagemaker-object-detection-demo'
# Task UI name - this value is unique per account and region. You can also provide your own value here.
taskUIName = 'ui-sagemaker-object-detection-demo'

### Create Human Task UI

Create a human task UI resource, giving a UI template in liquid html. This template will be rendered to the human workers whenever human loop is required.

For pre built UIs, check: https://github.com/aws-samples/amazon-a2i-sample-task-uis.

We will be taking an [object detection UI](https://github.com/aws-samples/amazon-a2i-sample-task-uis/blob/master/images/bounding-box.liquid.html) and filling in the object categories in the `labels` variable in the template.

In [None]:
template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<crowd-form>
  <crowd-bounding-box
    name="annotatedResult"
    src="{{ task.input.taskObject | grant_read_access }}"
    header="Draw bounding boxes around all the objects in this image"
    labels="['aeroplane', 'bicycle', 'bird', 'boat', 'bottle', 'bus', 'car', 'cat', 'chair', 'cow', 'diningtable', 'dog', 'horse', 'motorbike', 'person', 'pottedplant', 'sheep', 'sofa', 'train', 'tvmonitor']"
  >
    <full-instructions header="Bounding Box Instructions" >
      <p>Use the bounding box tool to draw boxes around the requested target of interest:</p>
      <ol>
        <li>Draw a rectangle using your mouse over each instance of the target.</li>
        <li>Make sure the box does not cut into the target, leave a 2 - 3 pixel margin</li>
        <li>
          When targets are overlapping, draw a box around each object,
          include all contiguous parts of the target in the box.
          Do not include parts that are completely overlapped by another object.
        </li>
        <li>
          Do not include parts of the target that cannot be seen,
          even though you think you can interpolate the whole shape of the target.
        </li>
        <li>Avoid shadows, they're not considered as a part of the target.</li>
        <li>If the target goes off the screen, label up to the edge of the image.</li>
      </ol>
    </full-instructions>

    <short-instructions>
      Draw boxes around the requested target of interest.
    </short-instructions>
  </crowd-bounding-box>
</crowd-form>
"""

def create_task_ui():
    '''
    Creates a Human Task UI resource.

    Returns:
    struct: HumanTaskUiArn
    '''
    response = sagemaker_client.create_human_task_ui(
        HumanTaskUiName=taskUIName,
        UiTemplate={'Content': template})
    return response

In [None]:
# Create task UI
humanTaskUiResponse = create_task_ui()
humanTaskUiArn = humanTaskUiResponse['HumanTaskUiArn']
print(humanTaskUiArn)

### Create the Flow Definition

In this section, we're going to create a flow definition definition. Flow Definitions allow us to specify:

* The workforce that your tasks will be sent to.
* The instructions that your workforce will receive. This is called a worker task template.
* The configuration of your worker tasks, including the number of workers that receive a task and time limits to complete tasks.
* Where your output data will be stored.

This demo is going to use the API, but you can optionally create this workflow definition in the console as well. 

In [None]:
create_workflow_definition_response = sagemaker_client.create_flow_definition(
        FlowDefinitionName= flowDefinitionName,
        RoleArn= role,
        HumanLoopConfig= {
            "WorkteamArn": WORKFORCE_ARN,
            "HumanTaskUiArn": humanTaskUiArn,
            "TaskCount": 1,
            "TaskDescription": "Identify and locate the object in an image.",
            "TaskTitle": "Object detection a2i demo"
        },
        OutputConfig={
            "S3OutputPath" : OUTPUT_PATH
        }
    )
flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] # let's save this ARN for future use

In [None]:
# Describe flow definition - status should be active
for x in range(60):
    describeFlowDefinitionResponse = sagemaker_client.describe_flow_definition(FlowDefinitionName=flowDefinitionName)
    print(describeFlowDefinitionResponse['FlowDefinitionStatus'])
    if (describeFlowDefinitionResponse['FlowDefinitionStatus'] == 'Active'):
        print("Flow Definition is active")
        break
    time.sleep(2)

Now that we have setup our Flow Definition, we are ready to call our object detection endpoint on SageMaker and start our human loops. In this tutorial, we are interested in starting a HumanLoop only if the highest prediction probability score returned by our model for objects detected is less than 50%. 

So, with a bit of logic, we can check the response for each call to the SageMaker endpoint using `load_and_predict` helper function, and if the highest score is less than 50%, we will kick off a HumanLoop to engage our workforce for a human review. 

In [None]:
# Get the sample images to s3 bucket for a2i UI to display
!aws s3 sync ./sample-a2i-images/ s3://{BUCKET}/a2i-results/sample-a2i-images/
    
human_loops_started = []
SCORE_THRESHOLD = .50
for fname in test_photos:
    # Call SageMaker endpoint and not display any object detected with probability lower than 0.4.
    response, score_filtered, fig = load_and_predict(fname, object_detector, threshold=0.4)
    
    # Sort by prediction score so that the first item has the highest probability
    score_filtered.sort(key=lambda x: x[1], reverse=True)

    # Our condition for triggering a human review
    if (score_filtered[0][1] < SCORE_THRESHOLD):
        s3_fname='s3://%s/a2i-results/%s' % (BUCKET, fname)
        print(s3_fname)
        humanLoopName = str(uuid.uuid4())
        inputContent = {
            "initialValue": score_filtered[0][0],
            "taskObject": s3_fname # the s3 object will be passed to the worker task UI to render
        }
        # start an a2i human review loop with an input
        start_loop_response = a2i.start_human_loop(
            HumanLoopName=humanLoopName,
            FlowDefinitionArn=flowDefinitionArn,
            HumanLoopInput={
                "InputContent": json.dumps(inputContent)
            }
        )
        human_loops_started.append(humanLoopName)
        print(f'Object detection Confidence Score of %s is less than the threshold of %.2f' % (score_filtered[0][0], SCORE_THRESHOLD))
        print(f'Starting human loop with name: {humanLoopName}  \n')
    else:
        print(f'Object detection Confidence Score of %s is above than the threshold of %.2f' % (score_filtered[0][0], SCORE_THRESHOLD))
        print('No human loop created. \n')

### Check Status of Human Loop

In [None]:
completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    print(f'HumanLoop Name: {human_loop_name}')
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print('\n')
    
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)

Since we are using private workteam, we should go to the labling UI to perform the inspection ourselves.

In [None]:
workteamName = WORKFORCE_ARN[WORKFORCE_ARN.rfind('/') + 1:]
print("Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!")
print('https://' + sagemaker_client.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])

### Check Status of Human Loop Again

In [None]:
completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    print(f'HumanLoop Name: {human_loop_name}')
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print('\n')
    
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)

### View Task Results  

Once work is completed, Amazon A2I stores results in your S3 bucket and sends a Cloudwatch event. Your results should be available in the S3 OUTPUT_PATH when all work is completed. Note that the human answer, the label and the bounding box, is returned and saved in the json file.

In [None]:
import re
import pprint

pp = pprint.PrettyPrinter(indent=4)

for resp in completed_human_loops:
    splitted_string = re.split('s3://' +  BUCKET + '/', resp['HumanLoopOutput']['OutputS3Uri'])
    output_bucket_key = splitted_string[1]

    response = s3.get_object(Bucket=BUCKET, Key=output_bucket_key)
    content = response["Body"].read()
    json_output = json.loads(content)
    pp.pprint(json_output)
    print('\n')

## Clean Up (finish module 3)

In [None]:
# object_detector.delete_endpoint()