# Amazon Augmented AI (Amazon A2I) integration with Tabular Data [Example]

1. [Introduction](#Introduction)
2. [Prerequisites](#Prerequisites)
    1. [Workteam](#Workteam)
    2. [Permissions](#Notebook-Permission)
3. [Client Setup](#Client-Setup)
4. [Create Control Plane Resources](#Create-Control-Plane-Resources)
    1. [Create Human Task UI](#Create-Human-Task-UI)
    2. [Create Flow Definition](#Create-Flow-Definition)
5. [Starting Human Loops](#Scenario-1-:-When-Activation-Conditions-are-met-,-and-HumanLoop-is-created)
    1. [Wait For Workers to Complete Task](#Wait-For-Workers-to-Complete-Task)
    2. [Check Status of Human Loop](#Check-Status-of-Human-Loop)
    3. [View Task Results](#View-Task-Results)


## Introduction

Amazon Augmented AI (Amazon A2I) makes it easy to build the workflows required for human review of ML predictions. Amazon A2I brings human review to all developers, removing the undifferentiated heavy lifting associated with building human review systems or managing large numbers of human reviewers. 

You can create your own workflows for ML models built on Amazon SageMaker or any other tools. Using Amazon A2I, you can allow human reviewers to step in when a model is unable to make a high confidence prediction or to audit its predictions on an on-going basis. 

Learn more here: https://aws.amazon.com/augmented-ai/

In this tutorial, we will show how you can use **Amazon A2I with Tabular data.** Tabular data is the most common form of data used by data scientists today for generating models. Use cases include, fraud detection, building customer propensity models, forecasting sales using regression etc. In many cases, data scientists often convert unstructured data such as text or images into structured tables that they then use for training models. 

Here we will first train a model and use the outputs of the trained model to build a human loop for review.

For more in depth instructions, visit https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-getting-started.html

To incorporate Amazon A2I into your human review workflows, you need three resources:

* A **worker task template** to create a worker UI. The worker UI displays your input data, such as documents or images, and instructions to workers. It also provides interactive tools that the worker uses to complete your tasks. For more information, see https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-instructions-overview.html

* A **human review workflow**, also referred to as a flow definition. You use the flow definition to configure your human workforce and provide information about how to accomplish the human review task. You can create a flow definition in the Amazon Augmented AI console or with Amazon A2I APIs. To learn more about both of these options, see https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html

* A **human loop** to start your human review workflow. When you use one of the built-in task types, the corresponding AWS service creates and starts a human loop on your behalf when the conditions specified in your flow definition are met or for each object if no conditions were specified. When a human loop is triggered, human review tasks are sent to the workers as specified in the flow definition.

When using a custom task type, as this tutorial will show, you start a human loop using the Amazon Augmented AI Runtime API. When you call `start_human_loop()` in your custom application, a task is sent to human reviewers.

### Install Latest SDK


In [None]:
# First, let's get the latest installations of our dependencies
!pip install --upgrade pip
!pip install boto3 --upgrade
!pip install -U botocore

## Setup
We need to set up the following data:
* `region` - Region to call A2I.
* `BUCKET` - A S3 bucket accessible by the given role
    * Used to store the sample images & output results
    * Must be within the same region A2I is called from
* `role` - The IAM role used as part of StartHumanLoop. By default, this notebook will use the execution role
* `workteam` - Group of people to send the work to

### Role and Permissions

The AWS IAM Role used to execute the notebook needs to have the following permissions:

* SagemakerFullAccess
* AmazonSageMakerMechanicalTurkAccess (if using MechanicalTurk as your Workforce)

In [None]:
from sagemaker import get_execution_role
import sagemaker

# Setting Role to the default SageMaker Execution Role
role = get_execution_role()
display(role)

In [None]:
import os
import boto3
import botocore

sess = sagemaker.Session() 

#bucket
BUCKET = sess.default_bucket() # or use a custom bucket if you created one. 
PREFIX = 'a2i-data'

#specify output path for artifacts
OUTPUT_PATH = f's3://{BUCKET}/a2i-results'

# Region 
region = boto3.session.Session().region_name
print(region)

## Tabular data with Amazon SageMaker

Before creating the template, we will load a tabular dataset, split the data into train and test, store the test data in Amazon S3, and train a machine learning model. The dataset we use is on Breast Cancer prediction and can be found here: [1] Dua, D. and Graff, C. (2019). UCI Machine Learning Repository [http://archive.ics.uci.edu/ml]. Irvine, CA: University of California, School of Information and Computer Science.

Based on the input features, we will first train a model to detect a benign or malignant label. 

Once the model is trained, we will create an endpoint, and generate some model predictions. We will then create a WorkerUI to load in our immutable test dataset as a table, and dynamically modify the verify and change predictions if needed.

In [None]:
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

def generatedf(split_ratio):
    """Loads the dataset into a dataframe and generates train/test splits"""
    data = load_breast_cancer()
    df = pd.DataFrame(data.data, columns = data.feature_names)
    df['label'] = data.target
    cols = list(df.columns)
    cols = cols[-1:] + cols[:-1]
    df = df[cols]
    train, test = train_test_split(df, test_size=split_ratio, random_state=42)
    return train, test

train_data, test_data = generatedf(0.2)
train_data.head()

In [None]:
#store the datasets locally
train_data.to_csv('train.csv',index = None, header=None)
test_data.to_csv('test.csv', index = None, header=None)

In [None]:
# load the data into S3
sess.upload_data('train.csv', bucket=BUCKET, key_prefix=os.path.join(PREFIX, 'train'))
sess.upload_data('test.csv', bucket=BUCKET, key_prefix=os.path.join(PREFIX, 'test'))

In [None]:
#load the train and test data filenames from Amazon S3
s3_input_train = sagemaker.s3_input(s3_data='s3://{}/{}/train'.format(BUCKET, PREFIX), content_type='csv')
s3_input_validation = sagemaker.s3_input(s3_data='s3://{}/{}/test/'.format(BUCKET, PREFIX), content_type='csv')
                                        

### Train and Deploy the model

SageMaker will set up the instance types needed and copy the data over to train the model. This may take about **3** minutes to complete training. Once the model is trained, we will deploy the model as an endpoint. Again, SageMaker will set up the instance required, copy the inference image and the inference code and create a HTTPS endpoint. This may take **4-5** minutes. For more details on how SageMaker creates an endpoint, visit: https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-hosting.html

In [None]:
from sagemaker.amazon.amazon_estimator import get_image_uri

container = get_image_uri(region, 'xgboost', '0.90-1')

xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    train_instance_count=1, 
                                    train_instance_type='ml.m5.xlarge',
                                    output_path=OUTPUT_PATH,
                                    sagemaker_session=sess)

xgb.set_hyperparameters(max_depth=2,
                        eta=0.2,
                        gamma=4,
                        min_child_weight=6,
                        subsample=0.8,
                        silent=0,
                        objective='binary:logistic',
                        num_round=100,
                        eval_metric='auc')

xgb.fit({'train': s3_input_train, 
         'validation': s3_input_validation})

In [None]:
xgb_predictor = xgb.deploy(initial_instance_count = 1, instance_type = 'ml.m5.xlarge')

In [None]:
from sagemaker.predictor import csv_serializer
xgb_predictor.content_type = 'text/csv'
xgb_predictor.serializer = csv_serializer
xgb_predictor.deserializer = None

In [None]:
## Lets now run predictions on our test set and use it to create a table containing our outputs.
import numpy as np

def predict(data, model, rows=500):
    split_array = np.array_split(data, int(data.shape[0] / float(rows) + 1))
    predictions = ''
    for array in split_array:
        predictions = ','.join([predictions, model.predict(array).decode('utf-8')])

    return np.round(np.fromstring(predictions[1:], sep=','))

## Generate predictions on the test set for the difference models

predictions = predict(test_data[list(test_data.columns)[1:]].values, xgb_predictor)

In [None]:
predictions

### Creating human review Workteam or Workforce

A workforce is the group of workers that you have selected to label your dataset. You can choose either the Amazon Mechanical Turk workforce, a vendor-managed workforce, or you can create your own private workforce for human reviews. Whichever workforce type you choose, Amazon Augmented AI takes care of sending tasks to workers. 

When you use a private workforce, you also create work teams, a group of workers from your workforce that are assigned to Amazon Augmented AI human review tasks. You can have multiple work teams and can assign one or more work teams to each job.

To create your Workteam, visit the instructions here: https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-management.html

After you have created your workteam, replace YOUR_WORKTEAM_ARN below

In [None]:
WORKTEAM_ARN = 'arn:aws:sagemaker:us-east-2:{account_num}:workteam/private-crowd/stefan-team'#'YOUR_WORKTEAM_ARN'

Visit: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-permissions-security.html to add the necessary permissions to your role

### Client Setup

Here we are going to setup the rest of our clients. 

In [None]:
import io
import uuid
import time

timestamp = time.strftime("%Y-%m-%d-%H-%M-%S", time.gmtime())
# Amazon SageMaker client
sagemaker_client = boto3.client('sagemaker', region)

# Amazon Augment AI (A2I) client
a2i = boto3.client('sagemaker-a2i-runtime')

# Amazon S3 client 
s3 = boto3.client('s3', region)

# Flow definition name - this value is unique per account and region. You can also provide your own value here.
flowDefinitionName = 'fd-sagemaker-tabular-data-demo-' + timestamp

# Task UI name - this value is unique per account and region. You can also provide your own value here.
taskUIName = 'ui-sagemaker-tabular-data-demo-' + timestamp

## Create Control Plane Resources

### Create Human Task UI

Create a human task UI resource, giving a UI template in liquid html. This template will be rendered to the human workers whenever human loop is required.

For over 70 pre built UIs, check: https://github.com/aws-samples/amazon-a2i-sample-task-uis.

We will use the following template to render both the test dataset, as well as the model predictions

In [None]:
template = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>

<style>
  table, tr, th, td {
    border: 1px solid black;
    border-collapse: collapse;
    padding: 5px;
  }
</style>

<crowd-form>
    <div>
        <h1>Instructions</h1>
        <p>Please review the predictions in the Predictions table based on the input data table below, and make corrections where appropriate. </p>
        <p> Here are the labels: </p>
        <p> 0: Benign </p>
        <p> 1: Malignant </p>
    </div>
    <div>
      <h3> Breast cancer dataset </h3>
      <div id="my_table"> {{ task.input.table | skip_autoescape }} </div>
   </div>
    <br>
    <h1> Predictions Table </h1>
    <table>
      <tr>
        <th>ROW NUMBER</th>
        <th>MODEL PREDICTION</th>
        <th>AGREE/DISAGREE WITH ML RATING?</th>
        <th>YOUR PREDICTION</th>
        <th>CHANGE REASON </th>
      </tr>

      {% for pair in task.input.Pairs %}

        <tr>
          <td>{{ pair.row }}</td>
          <td><crowd-text-area name="predicted{{ forloop.index }}" value="{{ pair.prediction }}"></crowd-text-area></td>
          <td>
            <p>
              <input type="radio" id="agree{{ forloop.index }}" name="rating{{ forloop.index }}" value="agree" required>
              <label for="agree{{ forloop.index }}">Agree</label>
            </p>
            <p>
              <input type="radio" id="disagree{{ forloop.index }}" name="rating{{ forloop.index }}" value="disagree" required>
              <label for="disagree{{ forloop.index }}">Disagree</label>       
            </p> 
          </td>
          <td>
            <p>
            <input type="text" name="True Prediction" placeholder="Enter your Prediction" />
            </p>
           </td>
           <td>
            <p>
            <input type="text" name="Change Reason" placeholder="Explain why you changed the prediction" />
            </p>
           </td>
        </tr>

      {% endfor %}

    </table>
</crowd-form>
"""

def create_task_ui():
    '''
    Creates a Human Task UI resource.

    Returns:
    struct: HumanTaskUiArn
    '''
    response = sagemaker_client.create_human_task_ui(
        HumanTaskUiName=taskUIName,
        UiTemplate={'Content': template})
    return response

In [None]:
# Create task UI
humanTaskUiResponse = create_task_ui()
humanTaskUiArn = humanTaskUiResponse['HumanTaskUiArn']
print(humanTaskUiArn)

### Create the Flow Definition

In this section, we're going to create a flow definition definition. Flow Definitions allow us to specify:

* The workforce that your tasks will be sent to.
* The instructions that your workforce will receive. This is called a worker task template.
* The configuration of your worker tasks, including the number of workers that receive a task and time limits to complete tasks.
* Where your output data will be stored.

This demo is going to use the API, but you can optionally create this workflow definition in the console as well. 

For more details and instructions, see: https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html.

In [None]:
create_workflow_definition_response = sagemaker_client.create_flow_definition(
        FlowDefinitionName= flowDefinitionName,
        RoleArn= role,
        HumanLoopConfig= {
            "WorkteamArn": WORKTEAM_ARN,
            "HumanTaskUiArn": humanTaskUiArn,
            "TaskCount": 1,
            "TaskDescription": "Make sure the labels are correct",
            "TaskTitle": "tabular data a2i demo"
        },
        OutputConfig={
            "S3OutputPath" : OUTPUT_PATH
        }
    )
flowDefinitionArn = create_workflow_definition_response['FlowDefinitionArn'] # let's save this ARN for future use

In [None]:
# Describe flow definition - status should be active
for x in range(60):
    describeFlowDefinitionResponse = sagemaker_client.describe_flow_definition(FlowDefinitionName=flowDefinitionName)
    print(describeFlowDefinitionResponse['FlowDefinitionStatus'])
    if (describeFlowDefinitionResponse['FlowDefinitionStatus'] == 'Active'):
        print("Flow Definition is active")
        break
    time.sleep(2)

## Human Loops

Now that we have setup our Flow Definition, we are ready to start the human loop to have the reviewers asynchronously review the outputs generated by our model. First we need to create a dictionary containing our model outputs, so we can load it dynamically

In [None]:
item_list = [{'row': "ROW_{}".format(x), 'prediction': predictions[x]} for x in range(5)]
item_list

In [None]:
ip_content = {"table": test_data.reset_index().drop(columns = ['index', 'label']).head().to_html(), 
              'Pairs': item_list
             }

In [None]:
import json
humanLoopName = str(uuid.uuid4())

start_loop_response = a2i.start_human_loop(
            HumanLoopName=humanLoopName,
            FlowDefinitionArn=flowDefinitionArn,
            HumanLoopInput={
                "InputContent": json.dumps(ip_content)
            }
        )

### Check Status of Human Loop

In [None]:
resp = a2i.describe_human_loop(HumanLoopName=humanLoopName)
print(f'HumanLoop Name: {humanLoopName}')
print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
print('\n')
    
if resp["HumanLoopStatus"] == "Completed":
    completed_human_loops.append(resp)

### Wait For Workers to Complete Task
Since we are using private workteam, we should go to the labling UI to perform the inspection ourselves.

In [None]:
workteamName = WORKTEAM_ARN[WORKTEAM_ARN.rfind('/') + 1:]
print("Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!")
print('https://' + sagemaker_client.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])

### Check Status of Human Loop Again

In [None]:
resp = a2i.describe_human_loop(HumanLoopName=humanLoopName)
print(f'HumanLoop Name: {humanLoopName}')
print(f'HumanLoop Status: {resp["HumanLoopStatus"]}')
print(f'HumanLoop Output Destination: {resp["HumanLoopOutput"]}')
print('\n')
    
if resp["HumanLoopStatus"] == "Completed":
    completed_human_loops.append(resp)

### View Task Results  

In [None]:
import re
import pprint

pp = pprint.PrettyPrinter(indent=4)

for resp in completed_human_loops:
    splitted_string = re.split('s3://' +  BUCKET + '/', resp['HumanLoopOutput']['OutputS3Uri'])
    output_bucket_key = splitted_string[1]

    response = s3.get_object(Bucket=BUCKET, Key=output_bucket_key)
    content = response["Body"].read()
    json_output = json.loads(content)
    pp.pprint(json_output)
    print('\n')

### Delete Resources

In [None]:
a2i.stop_human_loop(HumanLoopName=humanLoopName)


In [None]:
a2i.delete_human_loop(HumanLoopName=humanLoopName)

In [None]:
xgb_predictor.delete_endpoint()