In [5]:
import botocore
import sagemaker
import boto3
import io
import json
import uuid
import time



# Environment SetupÂ¶

We need to set up the following data:

REGION - Region to call A2I.

BUCKET_NAME - A S3 bucket accessible by the given role
    Used to store the input files and output results
    Must be within the same region A2I is called from
    
WORKTEAM_ARN - To create your Private Workteam, visit the instructions here: https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-private.html After you have created your workteam, replace \<YOUR-WORKTEAM-ARN> below

ENDPOINT_ARN- Comprehend Custom Classifier endpoint.
For instructions to create a Comprehend Custom Classifier and its endpoint refer to this blog:
https://aws.amazon.com/blogs/machine-learning/active-learning-workflow-for-amazon-comprehend-custom-classification-models-part-1/

ROLE - The IAM role used as part of StartHumanLoop. By default, this notebook will use the execution role. You can learn more about IAM Policies here https://docs.aws.amazon.com/IAM/latest/UserGuide/access_policies.html



In [None]:
# REGION = 'us-east-1'
WORKTEAM_ARN= "<Your worker team ARN>"
BUCKET = "<Your S3 bucket>"
ENDPOINT_ARN='<Your Custom Classification Endpoint>'
role = sagemaker.get_execution_role()
region = boto3.session.Session().region_name
prefix = "custom-classify" + str(uuid.uuid1())

# Client Setup

Let's setup the clients for Amazon SageMaker, Amazon A2I Runtime and Amazon Comprehend.


In [None]:
# Amazon SageMaker client
sagemaker = boto3.client('sagemaker', REGION)
# A2I Runtime client
a2i_runtime_client = boto3.client('sagemaker-a2i-runtime', REGION)
#Comprehend Client
comprehend = boto3.client('comprehend',REGION)

In [7]:

import pprint

# Pretty print setup
pp = pprint.PrettyPrinter(indent=2)

# Function to pretty-print AWS SDK responses
def print_response(response):
    if 'ResponseMetadata' in response:
        del response['ResponseMetadata']
    pp.pprint(response)



# Setting up an Amazon A2I human loop

In this section, you set up a human review loop for low-confidence detection in Amazon A2I. It includes the following steps:

    Step 1. Create a Worker Task template.
    Step 2. Create a Human review workflow.
    Step 3. Creating and Starting A2I human loop
    Step 4. Check the human loop status and start labelling

# Step 1. Create a Worker Task template

It is 2 step process:
    
    1. Select the UI template you want to use For over 70 pre built UIs, check: https://github.com/aws-samples/amazon-a2i-sample-task-uis
    
    2. Create Task template using create_human_task_ui API or you can do the same thing using the AWS Console.
Refer to this blog to follow AWS Console steps: https://aws.amazon.com/blogs/machine-learning/active-learning-workflow-for-amazon-comprehend-custom-classification-models-part-1/



In [46]:
#1. Select the UI template for custom classification and modify the categories based on your labels
template = """<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<crowd-form>
    <crowd-classifier-multi-select
      name="category"
      categories="['World', 'SciTech', 'Business','Sports']"
      header="Select the relevant categories"
    >
      <classification-target>
        {{ task.input.taskObject }}
      </classification-target>
      
      <full-instructions header="Text Categorization Instructions">
        <p><strong>World</strong>Related to world events</p>
        <p><strong>SciTech</strong>Anything realted to science or Technology</p>
        <p><strong>Business</strong>Anything related to driving business</p>
         <p><strong>Sports</strong>Anything related to sports</p>
      </full-instructions>

      <short-instructions>
       Choose all categories that are expressed by the text. 
      </short-instructions>
    </crowd-classifier-multi-select>
</crowd-form>
"""

# Create a worker task template using boto3 API

response = client.create_human_task_ui(
    
    HumanTaskUiName='string',
    
    UiTemplate={
        'Content': 'string'
    },
    
    Tags=[
        {
            'Key': 'string',
            'Value': 'string'
        },
    ]
)

https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_human_task_ui

In [21]:
def create_task_ui():
    '''
    Creates a Human Task UI resource.

    Returns:
    struct: HumanTaskUiArn
    '''
    response = sagemaker.create_human_task_ui(
        HumanTaskUiName=taskUIName,
        UiTemplate={'Content': template})
    return response

In [22]:
# Task UI name - this value is unique per account and region. You can also provide your own value here.
taskUIName = prefix + '-ui' 

# Create task UI
humanTaskUiResponse = create_task_ui()
humanTaskUiArn = humanTaskUiResponse['HumanTaskUiArn']
print(humanTaskUiArn)

arn:aws:sagemaker:us-east-1:186389221476:human-task-ui/custom-classify2c31660c-fcec-11ea-b54e-0f3f86ad49f5-ui


# Step 2. Create a Human review workflow
Use an Amazon Augmented AI (Amazon A2I) human review workflow, or flow definition, to specify the following: 
  

     The workforce that your tasks will be sent to.

     The instructions that your workforce will receive, which is called a worker task template.

     The configuration of your worker tasks, including the number of workers that receive a task and time limits to complete tasks.

     Where your output data will be stored.
        
To create a flow definition using the SageMaker API, you use the CreateFlowDefinition operation

This demo is going to use the API, but you can optionally create this workflow definition in the console as well.

For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.


In [23]:


# Flow definition name - this value is unique per account and region. You can also provide your own value here.
flowDefinitionName = prefix + '-fd-a2i' 

create_workflow_definition_response = sagemaker.create_flow_definition(
        FlowDefinitionName= flowDefinitionName,
        RoleArn= role,
        HumanLoopConfig= {
            "WorkteamArn": WORKTEAM_ARN,
            "HumanTaskUiArn": humanTaskUiArn,
            "TaskCount": 1,
            "TaskDescription": "Read the instructions",
            "TaskTitle": "Classify the text"
        },
        OutputConfig={
            "S3OutputPath" : "s3://"+BUCKET+"/output"
        }
    )
flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] # let's save this ARN for future use





In [24]:
# Describe flow definition - status should be active
for x in range(60):
    describeFlowDefinitionResponse = sagemaker.describe_flow_definition(FlowDefinitionName=flowDefinitionName)
    print(describeFlowDefinitionResponse['FlowDefinitionStatus'])
    if (describeFlowDefinitionResponse['FlowDefinitionStatus'] == 'Active'):
        print("Flow Definition is active")
        break
    time.sleep(2)

Active
Flow Definition is active


In [25]:


print(flowDefinitionArn)



arn:aws:sagemaker:us-east-1:186389221476:flow-definition/custom-classify2c31660c-fcec-11ea-b54e-0f3f86ad49f5-fd-a2i


# Sample Data to Test Comprehend Endpoint and create a request for A2I

In [26]:
sentence='MS Dhoni retired from cricket'
response = comprehend.classify_document(
    Text= sentence,
    EndpointArn=ENDPOINT_ARN
)
print(response)
p = response['Classes'][0]['Name']
score = response['Classes'][0]['Score']
        #print(f"S:{sentence}, Score:{score}")
response = {}
response['utterance']=sentence
response['prediction']=p
response['confidence'] = score
print(response)

{'Classes': [{'Name': 'SPORTS', 'Score': 0.7070213556289673}, {'Name': 'WORLD', 'Score': 0.1983727365732193}, {'Name': 'SCI_TECH', 'Score': 0.07924257218837738}], 'ResponseMetadata': {'RequestId': '319d6c0a-02d1-48a6-a6c8-ebe061b50393', 'HTTPStatusCode': 200, 'HTTPHeaders': {'x-amzn-requestid': '319d6c0a-02d1-48a6-a6c8-ebe061b50393', 'content-type': 'application/x-amz-json-1.1', 'content-length': '150', 'date': 'Tue, 22 Sep 2020 17:08:18 GMT'}, 'RetryAttempts': 0}}
{'utterance': 'MS Dhoni retired from cricket', 'prediction': 'SPORTS', 'confidence': 0.7070213556289673}


# Step 3. Creating and Starting A2I human loop 
For more information https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-start-human-loop.html#a2i-instructions-starthumanloop

When using Amazon A2I for a custom task, a human loops starts when StartHumanLoop is called in your application. 
Prerequisites

To complete this procedure, you need:

    Input data formatted as a string representation of a JSON-formatted file.

    The Amazon Resource Name (ARN) of your flow definition


response = client.start_human_loop(
    
    HumanLoopName='string',
    
    FlowDefinitionArn='string',
    
    HumanLoopInput={
        'InputContent': 'string'
    },
    
    DataAttributes={
        'ContentClassifiers': [
            'FreeOfPersonallyIdentifiableInformation'|'FreeOfAdultContent',
        ]
    }
)



In [27]:

human_loops_started = []

import json

CONFIDENCE_SCORE_THRESHOLD = .30


if(response['confidence'] > CONFIDENCE_SCORE_THRESHOLD):
        humanLoopName = str(uuid.uuid4())
        human_loop_input = {}
  
        human_loop_input['taskObject'] = response['utterance']
        start_loop_response = a2i_runtime_client.start_human_loop(
        HumanLoopName=humanLoopName,
        FlowDefinitionArn=flowDefinitionArn,
        HumanLoopInput={
                "InputContent": json.dumps(human_loop_input)
            }
        )
        print(human_loop_input)
        human_loops_started.append(humanLoopName)
        print(f'Score is less than the threshold of {CONFIDENCE_SCORE_THRESHOLD}')
        print(f'Starting human loop with name: {humanLoopName}  \n')
else:
         print('No human loop created. \n')

{'taskObject': 'MS Dhoni retired from cricket'}
Score is less than the threshold of 0.3
Starting human loop with name: c6bc21a2-cfa9-41dd-b28d-90264c6b8f0a  



# Display status of the Human Loop

In [28]:


completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i_runtime_client.describe_human_loop(HumanLoopName=human_loop_name)
    print(f'HumanLoop Name: {human_loop_name}')
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print('\n')
    
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)



HumanLoop Name: c6bc21a2-cfa9-41dd-b28d-90264c6b8f0a
HumanLoop Status: InProgress
HumanLoop Output Destination: {'OutputS3Uri': 's3://comprehend-demo0804/output/custom-classify2c31660c-fcec-11ea-b54e-0f3f86ad49f5-fd-a2i/2020/09/22/17/08/20/c6bc21a2-cfa9-41dd-b28d-90264c6b8f0a/output.json'}




# Navigate to the private worker portal and start Labelling!
Make sure you've invited yourself to your workteam!

In [29]:
workteamName = WORKTEAM_ARN[WORKTEAM_ARN.rfind('/') + 1:]
print("Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!")
print('https://' + sagemaker.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])

Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!
https://nm77hcfvpb.labeling.us-east-1.sagemaker.aws


# Check the status of human loops if the workers have completed labelling

In [30]:
completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i_runtime_client.describe_human_loop(HumanLoopName=human_loop_name)
    print(f'HumanLoop Name: {human_loop_name}')
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print('\n')
    
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)

HumanLoop Name: c6bc21a2-cfa9-41dd-b28d-90264c6b8f0a
HumanLoop Status: Completed
HumanLoop Output Destination: {'OutputS3Uri': 's3://comprehend-demo0804/output/custom-classify2c31660c-fcec-11ea-b54e-0f3f86ad49f5-fd-a2i/2020/09/22/17/08/20/c6bc21a2-cfa9-41dd-b28d-90264c6b8f0a/output.json'}




# Review the labelling results in Amazon S3

In [34]:
# I selected the last output.json entry from the list of human loops that were completed above
s3 = boto3.client('s3')

s3obj = s3.get_object(Bucket=BUCKET, Key='output/custom-classify2c31660c-fcec-11ea-b54e-0f3f86ad49f5-fd-a2i/2020/09/22/17/08/20/c6bc21a2-cfa9-41dd-b28d-90264c6b8f0a/output.json')
s3data = s3obj['Body'].read().decode('utf-8')
abc = json.loads(s3data)
print(str(abc))

{'flowDefinitionArn': 'arn:aws:sagemaker:us-east-1:186389221476:flow-definition/custom-classify2c31660c-fcec-11ea-b54e-0f3f86ad49f5-fd-a2i', 'humanAnswers': [{'answerContent': {'category': {'labels': ['World']}}, 'submissionTime': '2020-09-22T17:08:37.840Z', 'workerId': 'a4d8700afb91b37f', 'workerMetadata': {'identityData': {'identityProviderType': 'Cognito', 'issuer': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_VHJkg7Dra', 'sub': 'd0b307cb-3a7d-483f-9933-e6bb2f2a2aec'}}}], 'humanLoopName': 'c6bc21a2-cfa9-41dd-b28d-90264c6b8f0a', 'inputContent': {'taskObject': 'MS Dhoni retired from cricket'}}
