# Amazon Augmented AI (Amazon A2I) integration with Amazon Comprehend [Example]

Visit https://github.com/aws-samples/amazon-a2i-sample-jupyter-notebooks for all A2I Sample Notebooks


1. [Introduction](#Introduction)
2. [Prerequisites](#Prerequisites)
    2. [Workteam](#Workteam)
    3. [Permissions](#Notebook-Permission)
3. [Client Setup](#Client-Setup)
4. [Create Control Plane Resources](#Create-Control-Plane-Resources)
    1. [Create Human Task UI](#Create-Human-Task-UI)
    2. [Create Flow Definition](#Create-Flow-Definition)
5. [Starting Human Loops](#Scenario-1-:-When-Activation-Conditions-are-met-,-and-HumanLoop-is-created)
    1. [Wait For Workers to Complete Task](#Wait-For-Workers-to-Complete-Task)
    2. [Check Status of Human Loop](#Check-Status-of-Human-Loop)
    3. [View Task Results](#View-Task-Results)


## Introduction

Amazon Augmented AI (Amazon A2I) makes it easy to build the workflows required for human review of ML predictions. Amazon A2I brings human review to all developers, removing the undifferentiated heavy lifting associated with building human review systems or managing large numbers of human reviewers. 

You can create your own workflows for ML models built on Amazon SageMaker or any other tools. Using Amazon A2I, you can allow human reviewers to step in when a model is unable to make a high confidence prediction or to audit its predictions on an on-going basis. 

Learn more here: https://aws.amazon.com/augmented-ai/

In this tutorial, we will show how you can use **Amazon A2I with AWS Comprehend's Detect Sentiment API.**

For more in depth instructions, visit https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-getting-started.html

To incorporate Amazon A2I into your human review workflows, you need three resources:

* A **worker task template** to create a worker UI. The worker UI displays your input data, such as documents or images, and instructions to workers. It also provides interactive tools that the worker uses to complete your tasks. For more information, see https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-instructions-overview.html

* A **human review workflow**, also referred to as a flow definition. You use the flow definition to configure your human workforce and provide information about how to accomplish the human review task. You can create a flow definition in the Amazon Augmented AI console or with Amazon A2I APIs. To learn more about both of these options, see https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html

* A **human loop** to start your human review workflow. When you use one of the built-in task types, the corresponding AWS service creates and starts a human loop on your behalf when the conditions specified in your flow definition are met or for each object if no conditions were specified. When a human loop is triggered, human review tasks are sent to the workers as specified in the flow definition.

When using a custom task type, as this tutorial will show, you start a human loop using the Amazon Augmented AI Runtime API. When you call StartHumanLoop in your custom application, a task is sent to human reviewers.

### Install Latest SDK

In [None]:
# First, let's get the latest installations of our dependencies
!pip install --upgrade pip
!pip install boto3 --upgrade
!pip install -U botocore

## Setup
We need to set up the following data:
* `region` - Region to call A2I
* `bucket` - A S3 bucket accessible by the given role
    * Used to store the sample images & output results
    * Must be within the same region A2I is called from
* `role` - The IAM role used as part of StartHumanLoop. By default, this notebook will use the execution role
* `workteam` - Group of people to send the work to

In [None]:
# Region
REGION = '<REGION>'

#### Setup Bucket and Paths

In [None]:
import boto3
import botocore

BUCKET = '<YOUR_BUCKET>'
OUTPUT_PATH = f's3://{BUCKET}/a2i-results'

### Role and Permissions

The AWS IAM Role used to execute the notebook needs to have the following permissions:

* ComprehendFullAccess
* SagemakerFullAccess
* S3 Read/Write Access to the BUCKET listed above
* AmazonSageMakerMechanicalTurkAccess (if using MechanicalTurk as your Workforce)

In [None]:
from sagemaker import get_execution_role

# Setting Role to the default SageMaker Execution Role
ROLE = get_execution_role()
display(ROLE)

### Workteam or Workforce

A workforce is the group of workers that you have selected to label your dataset. You can choose either the Amazon Mechanical Turk workforce, a vendor-managed workforce, or you can create your own private workforce for human reviews. Whichever workforce type you choose, Amazon Augmented AI takes care of sending tasks to workers. 

When you use a private workforce, you also create work teams, a group of workers from your workforce that are assigned to Amazon Augmented AI human review tasks. You can have multiple work teams and can assign one or more work teams to each job.

To create your Workteam, visit the instructions here: https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-management.html

After you have created your workteam, replace YOUR_WORKTEAM_ARN below

In [None]:
WORKTEAM_ARN= "<YOUR_WORKTEAM>"

Visit: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-permissions-security.html to add the necessary permissions to your role

## Client Setup

Here we are going to setup the rest of our clients. 

In [None]:
import io
import json
import uuid
import time
import boto3
import botocore

# Amazon SageMaker client
sagemaker = boto3.client('sagemaker', REGION)

# Amazon Comprehend client
comprehend = boto3.client('comprehend', REGION)

# Amazon Augment AI (A2I) client
a2i = boto3.client('sagemaker-a2i-runtime')

s3 = boto3.client('s3', REGION)

### Comprehend helper method

In [None]:
# Will help us parse Detect Sentiment API responses 
def capsToCamel(all_caps_string):
    if all_caps_string == 'POSITIVE':
        return 'Positive'
    elif all_caps_string == 'NEGATIVE': 
        return 'Negative'
    elif all_caps_string == 'NEUTRAL':
        return 'Neutral'

## Create Control Plane Resources

### Create Human Task UI

Create a human task UI resource, giving a UI template in liquid html. This template will be rendered to the human workers whenever human loop is required.

Below we've provided a simple demo template that is compatible with AWS Comprehend's Detect Sentiment API input and response.

For over 70 pre built UIs, check: https://github.com/aws-samples/amazon-a2i-sample-task-uis

In [None]:
template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<crowd-form>
    <crowd-classifier
      name="sentiment"
      categories="['Positive', 'Negative', 'Neutral', 'Mixed']"
      initial-value="{{ task.input.initialValue }}"
      header="What sentiment does this text convey?"
    >
      <classification-target>
        {{ task.input.taskObject }}
      </classification-target>
      
      <full-instructions header="Sentiment Analysis Instructions">
        <p><strong>Positive</strong> sentiment include: joy, excitement, delight</p>
        <p><strong>Negative</strong> sentiment include: anger, sarcasm, anxiety</p>
        <p><strong>Neutral</strong>: neither positive or negative, such as stating a fact</p>
        <p><strong>Mixed</strong>: when the sentiment is mixed</p>
      </full-instructions>

      <short-instructions>
       Choose the primary sentiment that is expressed by the text. 
      </short-instructions>
    </crowd-classifier>
</crowd-form>
"""

def create_task_ui():
    '''
    Creates a Human Task UI resource.

    Returns:
    struct: HumanTaskUiArn
    '''
    response = sagemaker.create_human_task_ui(
        HumanTaskUiName=taskUIName,
        UiTemplate={'Content': template})
    return response

In [None]:
# Task UI name - this value is unique per account and region. You can also provide your own value here.
taskUIName = 'ui-comprehend-' + str(uuid.uuid4()) 

# Create task UI
humanTaskUiResponse = create_task_ui()
humanTaskUiArn = humanTaskUiResponse['HumanTaskUiArn']
print(humanTaskUiArn)

### Creating the Flow Definition

In this section, we're going to create a flow definition definition. Flow Definitions allow us to specify:

* The workforce that your tasks will be sent to.
* The instructions that your workforce will receive. This is called a worker task template.
* The configuration of your worker tasks, including the number of workers that receive a task and time limits to complete tasks.
* Where your output data will be stored.

This demo is going to use the API, but you can optionally create this workflow definition in the console as well. 

For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.

In [None]:
# Flow definition name - this value is unique per account and region. You can also provide your own value here.
flowDefinitionName = 'fd-comprehend-demo-' + str(uuid.uuid4()) 

create_workflow_definition_response = sagemaker.create_flow_definition(
        FlowDefinitionName= flowDefinitionName,
        RoleArn= ROLE,
        HumanLoopConfig= {
            "WorkteamArn": WORKTEAM_ARN,
            "HumanTaskUiArn": humanTaskUiArn,
            "TaskCount": 1,
            "TaskDescription": "Identify the sentiment of the provided text",
            "TaskTitle": "Detect Sentiment of Text"
        },
        OutputConfig={
            "S3OutputPath" : OUTPUT_PATH
        }
    )
flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] # let's save this ARN for future use

In [None]:
# Describe flow definition - status should be active
for x in range(60):
    describeFlowDefinitionResponse = sagemaker.describe_flow_definition(FlowDefinitionName=flowDefinitionName)
    print(describeFlowDefinitionResponse['FlowDefinitionStatus'])
    if (describeFlowDefinitionResponse['FlowDefinitionStatus'] == 'Active'):
        print("Flow Definition is active")
        break
    time.sleep(2)

## Human Loops

### Detect Sentiment with AWS Comprehend

Now that we have setup our Flow Definition, we are ready to call AWS Comprehend and start our human loops. In this tutorial, we are interested in starting a HumanLoop only if the SentimentScore returned by AWS Comprehend is less than 99%. 

So, with a bit of logic, we can check the response for each call to Detect Sentiment, and if the SentimentScore is less than 99%, we will kick off a HumanLoop to engage our workforce for a human review. 

#### Sample Data

In [None]:
sample_detect_sentiment_blurbs = ['I enjoy this product', 'I am unhappy with this product', 'It is okay', 'sometimes it works']

In [None]:
human_loops_started = []
SENTIMENT_SCORE_THRESHOLD = .99
for blurb in sample_detect_sentiment_blurbs:
    # Call AWS Comprehend's Detect Sentiment API
    response = comprehend.detect_sentiment(Text=blurb, LanguageCode='en')
    sentiment = response['Sentiment']
    
    print(f'Processing blurb: \"{blurb}\"')
    
    # Our condition for when we want to engage a human for review
    if (response['SentimentScore'][capsToCamel(sentiment)]< SENTIMENT_SCORE_THRESHOLD):
    
        humanLoopName = str(uuid.uuid4())
        inputContent = {
            "initialValue": sentiment.title(),
            "taskObject": blurb
        }
        start_loop_response = a2i.start_human_loop(
            HumanLoopName=humanLoopName,
            FlowDefinitionArn=flowDefinitionArn,
            HumanLoopInput={
                "InputContent": json.dumps(inputContent)
            }
        )
        human_loops_started.append(humanLoopName)
        print(f'SentimentScore of {response["SentimentScore"][capsToCamel(sentiment)]} is less than the threshold of {SENTIMENT_SCORE_THRESHOLD}')
        print(f'Starting human loop with name: {humanLoopName}  \n')
    else:
        print(f'SentimentScore of {response["SentimentScore"][capsToCamel(sentiment)]} is above threshold of {SENTIMENT_SCORE_THRESHOLD}')
        print('No human loop created. \n')

### Check Status of Human Loop

In [None]:
completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    print(f'HumanLoop Name: {human_loop_name}')
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print('\n')
    
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)

### Wait For Workers to Complete Task

In [None]:
workteamName = WORKTEAM_ARN[WORKTEAM_ARN.rfind('/') + 1:]
print("Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!")
print('https://' + sagemaker.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])

### Check Status of Human Loop Again

In [None]:
completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i.describe_human_loop(HumanLoopName=human_loop_name)
    print(f'HumanLoop Name: {human_loop_name}')
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print('\n')
    
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)

### View Task Results  

Once work is completed, Amazon A2I stores results in your S3 bucket and sends a Cloudwatch event. Your results should be available in the S3 OUTPUT_PATH when all work is completed.

In [None]:
import re
import pprint

pp = pprint.PrettyPrinter(indent=4)

for resp in completed_human_loops:
    splitted_string = re.split('s3://' +  BUCKET + '/', resp['HumanLoopOutput']['OutputS3Uri'])
    output_bucket_key = splitted_string[1]

    response = s3.get_object(Bucket=BUCKET, Key=output_bucket_key)
    content = response["Body"].read()
    json_output = json.loads(content)
    pp.pprint(json_output)
    print('\n')

### The End!