# Easily setup human review of your NLP based Entity Recognition workflows with Amazon SageMaker Ground Truth, Amazon Comprehend AutoML and Amazon Augmented AI (A2I)


1. [Introduction](#Introduction)
2. [Solution Overview](#Solution-Overview)
3. [Pre-processing input documents](#Step1---Pre-processing-of-input-documents)
4. [Named Entity Recognition Labeling using Amazon SageMaker Ground Truth](#Create-an-Amazon-SageMaker-Ground-Truth-Named-Entity-Recognition-Labeling-Job)
5. [Train an Amazon Comprehend Custom Entity Recognizer using the labeled dataset](#Train-an-Amazon-Comprehend-AutoML-model)
6. [Setup a Human Review workflow using Amazon Augmented AI](#Setup-a-Human-Review-loop-for-low-confidence-detections-using-Amazon-Augmented-AI)
7. [Conclusion](#Conclusion)


## Introduction

Amazon A2I provides built-in human review workflows for common machine learning use cases, such as NLP based entity recognition from documents, which allows predictions from Amazon Comprehend AutoML to be reviewed easily. You can also create your own workflows for ML models built on Amazon SageMaker or any other tools. Using Amazon A2I, you can allow human reviewers to step in when a model is unable to make a high confidence prediction or to audit its predictions on an on-going basis. Learn more <a href="https://aws.amazon.com/augmented-ai/">here</a>

In this tutorial, we will first setup a NLP based workflow for custom entity recognition by <a href="https://docs.aws.amazon.com/comprehend/latest/dg/custom-entity-recognition.html">Amazon Comprehend</a> from an input document using a labeled dataset created by <a href="https://docs.aws.amazon.com/sagemaker/latest/dg/sms-named-entity-recg.html">Amazon SageMaker Ground Truth Named Entity Recognition</a>. We will then show how you can set up an Amazon A2I human loop with a flow definition to trigger a review task for low confidence predictions.

For more in depth instructions, visit https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-getting-started.html

## Prerequisites
#### Before you proceed, verify that your Sagemaker Execution Role has the right policies

* Comprehend Full Access
* Sagemaker Full Access
* Your Sagemaker Execution Role should have access to S3 already. If not add the policy to access any S3 bucket.

#### Please verify that your Sagemaker Execution Role has the following statement
Add iam:passRole as an inline policy
{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Action": [
                "iam:PassRole"
            ],
            "Effect": "Allow",
            "Resource": "*"
        }
    ]
}

#### Finally you will need the following trust policies setup in your execution role.

{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "Service": [
          "sagemaker.amazonaws.com",
          "s3.amazonaws.com",
          "comprehend.amazonaws.com"
        ]
      },
      "Action": "sts:AssumeRole"
    }
  ]
}

## Step 1 - Pre-processing of input documents

Declare some essential variables to be used throughout the notebook

In [None]:
# Lets declare commonly used variables and do some initial checks
import boto3
import os
import json
import random
import time
import sagemaker
import uuid


s3 = boto3.client('s3')
s3res = boto3.resource('s3')

BUCKET = "a2i-experiments"
role = sagemaker.get_execution_role()
region = boto3.session.Session().region_name
prefix = "a2i-comprehend-gtner" + str(uuid.uuid1())
print(prefix)

bucket_region = s3.head_bucket(Bucket=BUCKET)['ResponseMetadata']['HTTPHeaders']['x-amz-bucket-region']
assert bucket_region == region, "Your S3 bucket {} and this notebook need to be in the same region.".format(BUCKET)

def upload_to_s3(s3path, file):
    data = open(file, "rb")
    key = s3path
    s3res.Bucket(BUCKET).put_object(Key=key, Body=data)

For our example, let's say we are looking at Slack messages or a bunch of tickets in JIRA. We would like to know if they're related to an AWS offering. We will use Amazon SageMaker Ground Truth’s Named Entity Recognition labeling feature to label a SERVICE or/and VERSION entity from the input. We will then train an Amazon Comprehend Custom Entity Recognizer to recognize the entities from text like tweets or ticket comments. The sample dataset is provided in **data/rawinput/aws-service-offerings.txt**. As an optional step, read the file, strip the HTML tags if any. Now convert it into a text document made up of multiple sentences and upload the processed document to a S3 bucket. 

#### Note: 
If you would like to directly go to the Amazon Comprehend Custom Entity Recognition training and Amazon Augmented AI human review steps, please execute Steps 1a to 1c and then skip to Step 3a, to the cell that executes code to create the annotations file and continue from there. The output.manifest created by Amazon SageMaker Ground Truth in this notebook is already available in **data/output.manifest**. 

### Step 1a - Split the text into sentences for more clarity

We will use the regular expressions package to split the chunk of text we got from above to a set of sentences. This is particularly important when we use Amazon SageMaker Ground Truth for the Named Entity labeling task

In [None]:
import re
alphabets= "([A-Za-z])"
prefixes = "(Mr|St|Mrs|Ms|Dr)[.]"
suffixes = "(Inc|Ltd|Jr|Sr|Co)"
starters = "(Mr|Mrs|Ms|Dr|He\s|She\s|It\s|They\s|Their\s|Our\s|We\s|But\s|However\s|That\s|This\s|Wherever)"
acronyms = "([A-Z][.][A-Z][.](?:[A-Z][.])?)"
websites = "[.](com|net|org|io|gov)"

def split_into_sentences(text):
    text = " " + text + "  "
    text = text.replace("\n"," ")
    text = re.sub(prefixes,"\\1<prd>",text)
    text = re.sub(websites,"<prd>\\1",text)
    if "Ph.D" in text: text = text.replace("Ph.D.","Ph<prd>D<prd>")
    text = re.sub("\s" + alphabets + "[.] "," \\1<prd> ",text)
    text = re.sub(acronyms+" "+starters,"\\1<stop> \\2",text)
    text = re.sub(alphabets + "[.]" + alphabets + "[.]" + alphabets + "[.]","\\1<prd>\\2<prd>\\3<prd>",text)
    text = re.sub(alphabets + "[.]" + alphabets + "[.]","\\1<prd>\\2<prd>",text)
    text = re.sub(" "+suffixes+"[.] "+starters," \\1<stop> \\2",text)
    text = re.sub(" "+suffixes+"[.]"," \\1<prd>",text)
    text = re.sub(" " + alphabets + "[.]"," \\1<prd>",text)
    text = re.sub(r'(?<=\d)[\.](?=\d)','',text)
    if "”" in text: text = text.replace(".”","”.")
    if "\"" in text: text = text.replace(".\"","\".")
    if "!" in text: text = text.replace("!\"","\"!")
    if "?" in text: text = text.replace("?\"","\"?")
    text = text.replace(".",".<stop>")
    text = text.replace("?","?<stop>")
    text = text.replace("!","!<stop>")
    text = text.replace("<prd>",".")
    text = text.replace('"','')
    sentences = text.split("<stop>")
    sentences = sentences[:-1]
    sentences = [s.strip() for s in sentences]
    return sentences

### We will use the above function to create a list of sentences which will be needed to generate input file. You can use the below code to generate input file for multiple documents. 

In [None]:
# We will now read our input file aws-service-offerings.txt from data/rawinput, pass to strip_tags, and write the resulting file back 

folderpath = r"data/rawinput" # make sure to put the 'r' in front and provide the folder where your files are
filepaths  = [os.path.join(folderpath, name) for name in os.listdir(folderpath) if not name.startswith('.')] # do not select hidden directories
print(filepaths)
all_files = []

for path in filepaths:
    with open(path, 'r') as f:
        # Execute this step below only if your input text was HTML
        #stripped_text = strip_tags(f.read())
        # If your input text was not HTML, replace the "stripped_text" below with f.read()
        structured_text = split_into_sentences(f.read())
        all_files.append(structured_text)        
f.close()

#### Create a input file with 200 entries to be used to generate Ground Truth Named Entity Recognition labeling manifest


In [None]:

import csv
fnfull = "inputs.csv"
!rm inputs.csv #in case there is already a file with the same name
with open(fnfull, "w", encoding='utf-8') as ff:
    csv_writer = csv.writer(ff, delimiter='\n')
    for infile in all_files:
        for num, sentence in enumerate(infile):
            csv_writer.writerow([sentence])
            if num == 201:
                break
print(num)                
ff.close()
s3_manifest_key = prefix + "/input/" + fnfull 
upload_to_s3(s3_manifest_key, fnfull)

### Step 1b - Creating training and test dataset for Amazon Comprehend Custom Entity




In [None]:
# Create the training file - Comprehend requires a minimum of 1000 samples in the training file
fntrain = "train.csv"
!rm train.csv #in case there is already a file with the same name
with open(fntrain, "w", encoding='utf-8') as fn:
    csv_writer = csv.writer(fn, delimiter='\n')
    for infile in all_files:
        for num, sentence in enumerate(infile):
            csv_writer.writerow([sentence])
fn.close()
s3_train_key = prefix + "/training/input/train/" + fntrain
upload_to_s3(s3_train_key, fntrain)

# Create the testing file - We will select 100+ rows after the training sample ends
fntest = "test.csv"
!rm test.csv #in case there is already a file with the same name
with open(fntest, "w", encoding='utf-8') as ft:
    csv_writer = csv.writer(ft, delimiter='\n')
    for infile in all_files:
        for num, sentence in enumerate(structured_text):
            if num > 1000:
                csv_writer.writerow([sentence])
                if num > 1200:
                    break
ft.close()
s3_test_key = prefix + "/training/input/test/" + fntest
upload_to_s3(s3_test_key, fntest)

 

## Step 2 - Create an Amazon SageMaker Ground Truth Named Entity Recognition Labeling Job

Now that we have processed our input file and converted it into a text file with multiple sentences, we can use this file to create a named entity recognition labeling job using Amazon SageMaker Ground Truth. The purpose is to annotate/label sentences within the input document as belonging to a custom entity that we define. There are some prerequisites to create the labeling job - a) we create a manifest file that Amazon SageMaker Ground Truth needs, b) we setup a labeling workforce, and c) we select a UI template that the workforce will use

### Step 2a - Create a manifest file

In [None]:
# Create and upload the input manifest by appending a source tag to each of the lines in the input text file. 
# Ground Truth uses the manifest file to determine labeling tasks

manifest_name = prefix + '-text-input.manifest'
# remove existing file with the same name to avoid duplicate entries
!rm *.manifest
s3bucket = s3res.Bucket(BUCKET)

with open(manifest_name, 'w') as f:
    for fn in s3bucket.objects.filter(Prefix=prefix +'/input/'):
        fn_obj = s3res.Object(BUCKET, fn.key)
        for line in fn_obj.get()['Body'].read().splitlines():                
            f.write('{"source":"' + line.decode('utf-8') +'"}\n')
f.close()
s3.upload_file(manifest_name, BUCKET, prefix + "/manifest/" + manifest_name)

### Note: Steps 2b and 2c below will be performed using the AWS Console

We will use the AWS Console to create the Private Labeling Workforce and start a Labeling job. You can use your email address to send the labeling task request and completing the labeling yourself as a private workforce labeler.

### Step 2b - Create a Private Labeling Workforce

With Amazon SageMaker Ground Truth, you can build high-quality training datasets for your machine learning models. With Ground Truth, you can use workers from either Amazon Mechanical Turk, a vendor company that you choose, or an internal, private workforce along with machine learning to enable you to create a labeled dataset. You can use the labeled dataset output from Ground Truth to train your own models, as a training dataset for an Amazon SageMaker model or in our case, we will use this labeled dataset to train an Amazon Comprehend Custom Entity Recognizer. 

This step requires you to use the AWS Console. However, we highly recommend that you follow it, especially when creating your own task with a custom dataset, label set, and template.

We will create a private workteam and add only one user (you) to it. Then, we will create a Ground Truth labeling job request to send the task to that workforce. You will then be able to see your annotation job and can even annotate the whole dataset yourself!

To create a private team:

1. Go to AWS Console > Amazon SageMaker > Labeling workforces
2. Click "Private" and then "Create private team".
3. Enter the desired name for your private workteam.
4. Enter your own email address in the "Email addresses" section.
5. Enter the name of your organization and a contact email to administer the private workteam.
6. Click "Create Private Team".
7. The AWS Console should now return to AWS Console > Amazon SageMaker > Labeling workforces. Your newly created team should be visible under "Private teams". Next to it you will see an ARN which is a long string that looks like arn:aws:sagemaker:region-name-123456:workteam/private-crowd/team-name. 
8. You should get an email from no-reply@verificationemail.com that contains your workforce username and password.
9. In AWS Console > Amazon SageMaker > Labeling workforces, click on the URL in Labeling portal sign-in URL. Use the email/password combination from Step 8 to log in (you will be asked to create a new, non-default password).
10. This is your private worker's interface. When we create a verification task in Verify your task using a private team below, your task should appear in this window. You can invite your colleagues to participate in the labeling job by clicking the "Invite new workers" button.

The <a href="https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-management.html" target="_blank">Amazon SageMaker Ground Truth</a> documentation has more details on the management of private workteams.

### Step 2c - Create and Start the Amazon SageMaker Ground Truth Labeling job

#### Setup a Labeling Job

1. Go to AWS Console > Amazon SageMaker > Labeling Jobs (under Ground Truth heading on the left pane)
2. Click on Create Labeling Job
3. Execute the cell below and use the labeling job name displayed for the Job Name 
4. For the Input Dataset Location, provide the S3 location of the manifest file you created in Step 2a
5. Provide a S3 bucket with an "output" prefix (for example, s3://bucket-name/output) in the Output Dataset Location
6. In the IAM role field, choose - Create a new Role and select "Any S3 Bucket" and click Create
7. In the Task Type field, select - Text and select - Named Entity Recognition
8. Click on Next at the bottom of the page
9. In the next page, select worker type as "Private"
10. In the Private Teams listbox, select the team you created in Step 2b
11. In the Named Entity Recognition Labeling Tool section, do the following:
    a. In the text box that says "Enter a brief description of the task", type "Highlight the word or group of words and select the corresponding most appropriate label from the right"
    b. In the box on the left, clear out the instructions and type "Your labeling will be used to train a Machine Learning model for predictions. Please think carefully on the most appropriate label for the word selection. Remember to highlight at least 10 entries for each Label" and select Bold Italics
    c. In the Labels section, type the Label names you want to display to your workforce. As a best practice provide 10+ Labels that your workforce will use.
    d. Click Create


#### Start Labeling

1. You/your workforce should have received an email as mentioned in point 8 in Step 2b above
2. Login to the URL provided with the username and password
3. This will take you to the Labeling Task UI. Complete the Labeling tasks by selecting labels for groups of words and clicking on Submit
4. When all entries have been labeled, the UI will automatically exit
5. Go back to AWS Console > Amazon Sagemaker > Labeling Jobs and check the status of Labeling Job
6. Please wait until the status reflects "Complete"

#### Verify annotation outputs
Go to the S3 bucket location mentioned in point 5 in Setup a Labeling Job above and review the ouputs. Ground Truth creates several directories in your Amazon S3 output path. These directories contain the results of your labeling job and other artifacts of the job. The top-level directory for a labeling job is given the same name as your labeling job, the output directories are placed beneath it. Since we did not use Active Learning in our example, we will have 2 directories - Annotations and Manifest.

Annotations: The annotations directory contains all of the annotations made by the workforce. These are the responses from individual workers that have not been consolidated into a single label for the data object.

Manifests: The manifests directory contains the output manifest from your labeling job. There is one subdirectory in the manifest directory, output. The output directory contains the output manifest file for your labeling job. The file is named output.manifest.

Please go to your S3 bucket and navigate to "output/directory with your labeling job name/manifests/output/output.manifest" to review the annotated file

## Step 3 - Train an Amazon Comprehend AutoML model

We will now use the annotated dataset created by Amazon SageMaker Ground Truth in Step 2 to train an Amazon Comprehend Custom Entity Recognizer. We will have to make minor adjustments to the format of the annotated dataset to feed as an input for training the Recognizer. 

### Step 3a - Process the annotated dataset

We will extract and transform the content we need from the annotated dataset. As per guidelines in the Amazon Comprehend documentation:
   1. A minimum of 200 annotations are needed per entity to train a model for custom entity recognition.
   2. It is important that your annotations be in a properly configured CSV file so your chance of having problems with your annotations file is minimal. The following must be true:
      a. UTF-8 encoding must be explicitly specified, even if its used as a default in most cases.
      b. It must include the column names: File, Line, Begin Offset, End Offset, Type.
      c. We highly recommended that CSV input files are generated programmatically to avoid potential issues.

#### Note: If you don't have a manifest file, you can use the output.manifest created for this notebook from the data folder

#### For more details on Amazon Comprehend Custom Entity Recognizer inputs refer to <a href='https://docs.aws.amazon.com/comprehend/latest/dg/training-recognizers.html'>this link</a>

In [None]:
# Let's download the output.manifest file for format conversion
labeling_job_name = '<your labeling job name>'
s3.download_file(BUCKET, 'output/' + labeling_job_name + '/manifests/output/output.manifest', 'data/groundtruth/output.manifest')

#### Lets use the python csv function to create an annotations file by parsing the output manifest file created by Ground Truth

In [None]:
# Read the output manifest json and convert into a csv format as expected by Amazon Comprehend Custom Entity Recognizer
import json
import csv
# Here we specify the labeling job name from the output.manifest file - the one below is what was used in the example output.manifest included
labeling_job_name = 'a2i-comprehend-gtner7c0bf6de-b5a1-11ea-bf5f-12acee018271'
# this will be the file that will be written by the format conversion code block below
csvout = 'annotations.csv'

with open(csvout, 'w', encoding="utf-8") as nf:
    csv_writer = csv.writer(nf)
    csv_writer.writerow(["File", "Line", "Begin Offset", "End Offset", "Type"])
    with open("data/groundtruth/output.manifest", "r") as fr:
        for num, line in enumerate(fr.readlines()):
            lj = json.loads(line)
            #print(str(lj))
            if lj and labeling_job_name in lj:
                for ent in lj[labeling_job_name]['annotations']['entities']:
                    csv_writer.writerow([fntrain,num,ent['startOffset'],ent['endOffset'],ent['label'].upper()])
    fr.close()
nf.close()        

s3_annot_key = "output/" + labeling_job_name + "/comprehend/" + csvout

upload_to_s3(s3_annot_key, csvout)

### Step 3b - Setup an Amazon Comprehend Custom Entity Recognizer

Amazon Comprehend's custom entity recognition enables you to analyze your documents to find entities specific to your needs, rather than limiting you to the preset entity types already available. You can identify almost any kind of entity, simply by providing a sufficient number of details to train your model effectively.

The training process usually requires extensive knowledge of machine learning (ML) and a complex process for model optimization. Amazon Comprehend automates this for you using a technique called transfer learning to build your model on a sophisticated general-purpose entities recognition model framework. With this in place, all you need to supply is the data. However, it's important that you supply it with high quality data as input. Without good data the model won't learn how to correctly identify entities.

You can choose one of two ways to provide data to Amazon Comprehend in order to train a custom entity recognition model:

**Annotations** — This uses an annotation list that provides the location of your entities in a large number of documents so Amazon Comprehend can train on both the entity and its context.

**Entity Lists** — This provides only a limited context, and uses only a list of the specific entities list so Amazon Comprehend can train to identify the custom entity.

For our experiment, we created an annotation manifest using an Amazon SageMaker Ground Truth Named Entity Recognizer labeling job in **Step 2c**, formatted it to the csv structure that Amazon Comprehend Customer Entity Recognizer in **Step 3a**.

In [None]:
comprehend = boto3.client('comprehend')

In [None]:
s3_train_channel = 's3://{}/{}'.format(BUCKET, s3_train_key)
s3_annot_channel = 's3://{}/{}'.format(BUCKET, s3_annot_key)

In [None]:
custom_entity_request = {

      "Documents": { 
         "S3Uri": s3_train_channel
      },
      "Annotations": { 
         "S3Uri": s3_annot_channel
      },
      "EntityTypes": [
                {
                    "Type": "SERVICE"
                },
                {
                    "Type": "VERSION"
                }
      ]
}

#### Create the Entity Recognizer

In [None]:
import datetime

id = str(datetime.datetime.now().strftime("%s"))
create_custom_entity_response = comprehend.create_entity_recognizer(
        RecognizerName = prefix + "-CER", 
        DataAccessRoleArn = role,
        InputDataConfig = custom_entity_request,
        LanguageCode = "en"
)

#### Lets review the status of the training job in 1 minute increments
For a sample of a 1000 entries, training should typically complete within 15 minutes

In [None]:
jobArn = create_custom_entity_response['EntityRecognizerArn']

max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_custom_recognizer = comprehend.describe_entity_recognizer(
        EntityRecognizerArn = jobArn
    )
    status = describe_custom_recognizer["EntityRecognizerProperties"]["Status"]
    print("Custom entity recognizer: {}".format(status))
    
    if status == "TRAINED" or status == "IN_ERROR":
        break
        
    time.sleep(60)

### Step 3c - Test the Amazon Comprehend Custom Entity Recognizer
We now use the StartEntitiesDetectionJob operation to detect custom entities in our documents. Using this operation, you provide the same information as you would when detecting preset entities. However, in addition to the input and output locations (S3 buckets), you also provide the EntityRecognizerArn, which is the Amazon Resource Name (ARN) of the trained model. This ARN is supplied by the response to the CreateEntityRecognizer operation.

You can examine one document or many, and each model can be trained on up to 12 custom entities at a time. You can search for up to 12 entities per StartEntitiesDetectionJob operation


#### Lets first look at the Recognizer metrics

In [None]:
print(json.dumps(describe_custom_recognizer["EntityRecognizerProperties"]["RecognizerMetadata"]["EntityTypes"], indent=2, default=str))

With more training samples and annotations per entity type, we can improve the Evaluation Metrics for our Recognizer. Please refer to <a href="https://docs.aws.amazon.com/comprehend/latest/dg/cer-annotation.html">this link</a> for best practices during training

#### Execute the Entity Detection job to get some predictions on our test dataset

In [None]:
s3_test_channel = 's3://{}/{}'.format(BUCKET, s3_test_key)
s3_output_test_data = 's3://{}/{}'.format(BUCKET, "output/testresults/")
test_response = comprehend.start_entities_detection_job(
    InputDataConfig={
        'S3Uri': s3_test_channel,
        'InputFormat': 'ONE_DOC_PER_LINE'
    },
    OutputDataConfig={
        'S3Uri': s3_output_test_data
    },
    DataAccessRoleArn=role,
    JobName='a2i-comprehend-gt-blog',
    EntityRecognizerArn=jobArn,
    LanguageCode='en'
)

#### Lets monitor the status of our job in 1 minute intervals

In [None]:
jobId = test_response['JobId']
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_job = comprehend.describe_entities_detection_job(
        JobId = jobId
    )
    status = describe_job["EntitiesDetectionJobProperties"]["JobStatus"]
    print("Job Status: {}".format(status))
    
    if status == "COMPLETED" or status == "FAILED":
        break
        
    time.sleep(60)

#### Lets review the test results

In [None]:
#Download the test output to this notebook
job_output = describe_job["EntitiesDetectionJobProperties"]["OutputDataConfig"]["S3Uri"]
path_prefix = 's3://{}/'.format(BUCKET)
job_key = os.path.relpath(job_output, path_prefix)

s3res.Bucket(BUCKET).download_file(job_key, 'output.tar.gz')


In [None]:
!tar xvzf output.tar.gz

In [None]:
#Display the results from the detection
import json

data = []
for line in open('output', 'r'):
    entities = json.loads(line)['Entities']
    file = json.loads(line)['File']
    ln = json.loads(line)['Line']
    rd = open(file, 'r', encoding='utf-8')
    nr = rd.readlines()
    orig_text = nr[ln]
    #print(line)
    if entities != None and len(entities) > 0:
        data.append({'ORIGINAL_TEXT': orig_text, 'CONFIDENCE_SCORE': round(entities[0]['Score']*100,0),'END_OFFSET': entities[0]['EndOffset'], 'BEGIN_OFFSET': entities[0]['BeginOffset'], 'SELECTED_TEXT': entities[0]['Text'], 'ENTITY': entities[0]['Type']})
rd.close()
for line in data:
    print(line)

## Step 4 - Setup a Human Review loop for low confidence detections using Amazon Augmented AI

Amazon Augmented AI (Amazon A2I) makes it easy to build the workflows required for human review of ML predictions. Amazon A2I brings human review to all developers, removing the undifferentiated heavy lifting associated with building human review systems or managing large numbers of human reviewers.

To incorporate Amazon A2I into your human review workflows, you need three resources:

**A worker task template** to create a worker UI. The worker UI displays your input data, such as documents or images, and instructions to workers. It also provides interactive tools that the worker uses to complete your tasks. For more information, see <a href="https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-instructions-overview.html">A2I instructions overview</a>

**A human review workflow**, also referred to as a flow definition. You use the flow definition to configure your human workforce and provide information about how to accomplish the human review task. For built-in task types, you also use the flow definition to identify the conditions under which a review human loop is triggered. For example, with Amazon Rekognition can perform image content moderation using machine learning. You can use the flow definition to specify that an image will be sent to a human for content moderation review if Amazon Rekognition's confidence score output is low for your use case. You can create a flow definition in the Amazon Augmented AI console or with the Amazon A2I APIs. To learn more about both of these options, see <a href="https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html">create flow definition</a>

**A human loop** to start your human review workflow. When you use one of the built-in task types, the corresponding AWS service creates and starts a human loop on your behalf when the conditions specified in your flow definition are met or for each object if no conditions were specified. When a human loop is triggered, human review tasks are sent to the workers as specified in the flow definition.

When using a custom task type, you start a human loop using the Amazon Augmented AI Runtime API. When you call StartHumanLoop in your custom application, a task is sent to human reviewers.

### Step 4a - Workteam or Workforce setup

A workforce is the group of workers that you have selected to label your dataset. You can choose either the Amazon Mechanical Turk workforce, a vendor-managed workforce, or you can create your own private workforce for human reviews. Whichever workforce type you choose, Amazon Augmented AI takes care of sending tasks to workers.

When you use a private workforce, you also create work teams, a group of workers from your workforce that are assigned to Amazon Augmented AI human review tasks. You can have multiple work teams and can assign one or more work teams to each job.

To create your Workteam, visit the instructions <a href="https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-management.html">here</a> 

After you have created your workteam, replace YOUR_WORKTEAM_ARN below

In [None]:
import botocore

REGION = 'us-east-1'
WORKTEAM_ARN= "<Enter your workteam ARN>"

Let's setup the clients for Amazon S3, Amazon SageMaker A2I Runtime and Amazon Comprehend.

In [None]:
import boto3
import io
import json
import uuid
import botocore
import time
import botocore

# Amazon SageMaker client
sagemaker = boto3.client('sagemaker', REGION)


# A2I Runtime client
a2i_runtime_client = boto3.client('sagemaker-a2i-runtime', REGION)

In [None]:
import pprint

# Pretty print setup
pp = pprint.PrettyPrinter(indent=2)

# Function to pretty-print AWS SDK responses
def print_response(response):
    if 'ResponseMetadata' in response:
        del response['ResponseMetadata']
    pp.pprint(response)

### Step 4b - Create Human Task UI

Create a human task UI resource, giving a UI template in liquid html. This template will be rendered to the human workers whenever human loop is required.

Below we've provided a simple demo template that is compatible with Amazon Comprehend Entity detection.

For over 70 pre built UIs, check: https://github.com/aws-samples/amazon-a2i-sample-task-uis

In [None]:
template = """
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<style>
    .highlight {
        background-color: yellow;
    }
</style>

<crowd-entity-annotation
        name="crowd-entity-annotation"
        header="Highlight parts of the text below"
        labels="[{'label': 'service', 'fullDisplayName': 'Service'}, {'label': 'version', 'fullDisplayName': 'Version'}]"
        text="{{ task.input.originalText }}"
>
    <full-instructions header="Named entity recognition instructions">
        <ol>
            <li><strong>Read</strong> the text carefully.</li>
            <li><strong>Highlight</strong> words, phrases, or sections of the text.</li>
            <li><strong>Choose</strong> the label that best matches what you have highlighted.</li>
            <li>To <strong>change</strong> a label, choose highlighted text and select a new label.</li>
            <li>To <strong>remove</strong> a label from highlighted text, choose the X next to the abbreviated label name on the highlighted text.</li>
            <li>You can select all of a previously highlighted text, but not a portion of it.</li>
        </ol>
    </full-instructions>

    <short-instructions>
        Select the word or words in the displayed text corresponding to the entity, label it and click submit
    </short-instructions>

    <div id="recognizedEntities" style="margin-top: 20px">
                <h3>Label the Entity below in the text above</h3>
                <p>{{ task.input.entities }}</p>
    </div>
</crowd-entity-annotation>

<script>

    function highlight(text) {
        var inputText = document.getElementById("inputText");
        var innerHTML = inputText.innerHTML;
        var index = innerHTML.indexOf(text);
        if (index >= 0) {
            innerHTML = innerHTML.substring(0,index) + "<span class='highlight'>" + innerHTML.substring(index,index+text.length) + "</span>" + innerHTML.substring(index + text.length);
            inputText.innerHTML = innerHTML;
        }
    }

    document.addEventListener('all-crowd-elements-ready', () => {
        document
            .querySelector('crowd-entity-annotation')
            .shadowRoot
            .querySelector('crowd-form')
            .form
            .appendChild(recognizedEntities);
    });
</script>
"""

### Step 4c - Create a Worker Task Template Creator Function

This function would be a higher level abstration, on the SageMaker package's method to create the Worker Task Template which we will use in the next step to create a human review workflow.


In [None]:
def create_task_ui():
    '''
    Creates a Human Task UI resource.

    Returns:
    struct: HumanTaskUiArn
    '''
    response = sagemaker.create_human_task_ui(
        HumanTaskUiName=taskUIName,
        UiTemplate={'Content': template})
    return response

In [None]:
# Task UI name - this value is unique per account and region. You can also provide your own value here.
taskUIName = prefix + '-ui' 

# Create task UI
humanTaskUiResponse = create_task_ui()
humanTaskUiArn = humanTaskUiResponse['HumanTaskUiArn']
print(humanTaskUiArn)


### Step 4d - Creating the Flow Definition

In this section, we're going to create a flow definition definition. Flow Definitions allow us to specify:

The workforce that your tasks will be sent to.
The instructions that your workforce will receive. This is called a worker task template.
Where your output data will be stored.

This demo is going to use the API, but you can optionally create this workflow definition in the console as well.

For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.


In [None]:
# Flow definition name - this value is unique per account and region. You can also provide your own value here.
flowDefinitionName = prefix + '-fd-a2i' 

create_workflow_definition_response = sagemaker.create_flow_definition(
        FlowDefinitionName= flowDefinitionName,
        RoleArn= role,
        HumanLoopConfig= {
            "WorkteamArn": WORKTEAM_ARN,
            "HumanTaskUiArn": humanTaskUiArn,
            "TaskCount": 1,
            "TaskDescription": "Label the context of words in the providex text as PERSON or THING",
            "TaskTitle": "Detect Context of words in Text"
        },
        OutputConfig={
            "S3OutputPath" : "s3://"+BUCKET+"/output"
        }
    )
flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] # let's save this ARN for future use

In [None]:
# Describe flow definition - status should be active
for x in range(60):
    describeFlowDefinitionResponse = sagemaker.describe_flow_definition(FlowDefinitionName=flowDefinitionName)
    print(describeFlowDefinitionResponse['FlowDefinitionStatus'])
    if (describeFlowDefinitionResponse['FlowDefinitionStatus'] == 'Active'):
        print("Flow Definition is active")
        break
    time.sleep(2)

In [None]:
print(flowDefinitionArn)

#### Lets setup the condition for triggering the human loop review

In [None]:
#Display the results from the detection


human_loops_started = []

import json

CONFIDENCE_SCORE_THRESHOLD = 90
for line in data:
    print("Line is: " + str(line))
    begin_offset=line['BEGIN_OFFSET']
    end_offset=line['END_OFFSET']
    if(line['CONFIDENCE_SCORE'] < CONFIDENCE_SCORE_THRESHOLD):
        humanLoopName = str(uuid.uuid4())
        human_loop_input = {}
        human_loop_input['labels'] = line['ENTITY']
        human_loop_input['entities']= line['ENTITY']
        human_loop_input['originalText'] = line['ORIGINAL_TEXT']
        start_loop_response = a2i_runtime_client.start_human_loop(
        HumanLoopName=humanLoopName,
        FlowDefinitionArn=flowDefinitionArn,
        HumanLoopInput={
                "InputContent": json.dumps(human_loop_input)
            }
        )
        print(human_loop_input)
        human_loops_started.append(humanLoopName)
        print(f'Score is less than the threshold of {CONFIDENCE_SCORE_THRESHOLD}')
        print(f'Starting human loop with name: {humanLoopName}  \n')
    else:
         print('No human loop created. \n')
    
    

### Step 4e - Check human loop status and wait for reviewers to complete task

Let's define a function that allows us to check the status of Human Loop progress.


In [None]:
completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i_runtime_client.describe_human_loop(HumanLoopName=human_loop_name)
    print(f'HumanLoop Name: {human_loop_name}')
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print('\n')
    
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)

Wait For workers to complete the tasks

In [None]:


workteamName = WORKTEAM_ARN[WORKTEAM_ARN.rfind('/') + 1:]
print("Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!")
print('https://' + sagemaker.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])



Check status of Human Loop again

In [None]:
completed_human_loops = []
for human_loop_name in human_loops_started:
    resp = a2i_runtime_client.describe_human_loop(HumanLoopName=human_loop_name)
    print(f'HumanLoop Name: {human_loop_name}')
    print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
    print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
    print('\n')
    
    if resp["HumanLoopStatus"] == "Completed":
        completed_human_loops.append(resp)

#### Lets review the annotation output from our A2I labeling task

In [None]:
# I selected the last output.json entry from the list of human loops that were completed above
s3 = boto3.client('s3')

s3obj = s3.get_object(Bucket=BUCKET, Key='output/a2i-comprehend-gtner9dcaba0e-1928-11eb-94ff-f91d18329acb-fd-a2i/2020/10/28/15/49/36/d1b9b32c-4b97-47fd-9f48-ca349f8a1278/output.json')
s3data = s3obj['Body'].read().decode('utf-8')
abc = json.loads(s3data)
print(str(abc['humanAnswers']))


## Conclusion
And that's it. This notebook demonstrated how easy it is to setup a NLP processing flow for custom entity recognition from a text corpus using Amazon SageMaker Ground Truth Named Entity Recognition for entity labeling and using the manifest file generated by Ground Truth to create annotations for training an Amazon Comprehend Custom Entity Recognizer. We then used Amazon Augmented AI to complete human reviews of low confidence predictions by setting up a Task UI, Flow Definition and using A2I's runtime APIs to trigger the human loop if predictions were below a threshold for confidence scores. **The annotations generated by A2I can be used to update the annotations file we created to incrementally train the Amazon Comprehend Custom Entity Recognizer, to improve the accuracy of our model**. We are interested in how you would like to extend this notebook for your use case and welcome your feedback.  