# Create an Active Learning Workflow using Amazon SageMaker Ground Truth

### Using this Notebook

Please set the kernel to *Python 3* and image to *TensorFlow 2.10.0  Python 3.9 CPU Optimized* when running this notebook. Select instance type as `ml.t3.medium`

In this notebook, we will also  leverage [AWS SAM](https://aws.amazon.com/serverless/sam/)  to manage and deploy AWS CloudFormation templates that will be used  for Active Labeling workflow. The AWS CloudFormation template will help create AWS Lambda functions,  AWS Step functions, and IAM roles that will be used for active learning workflow.

While following along with this blog post, we recommend that you leave most of the cells unmodified. However, the notebook will indicate where you can modify variables to create the resources needed for a custom labeling job.

If you plan to customize the Ground Truth labeling job request configuration below, you will also need the resources required to create a labeling job. For more information, see [Use Amazon SageMaker Ground Truth for Data Labeling](https://docs.aws.amazon.com/sagemaker/latest/dg/sms.html). 

Run the code cells in this notebook to configure a Labeling Job request in JSON format. This request JSON can be used in an active learning workflow and will determine how your labeling job task appears to human workers. 

To customize this notebook, you will need to modify the the cells below and configure the Ground Truth labeling job request (`human_task_config`) to meet your requirements. To learn how to create a Ground Truth labeling job using the Amazon SageMaker API, see [CreateLabelingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateLabelingJob.html).

### Prerequisites

Please complete the initial prerequisites mentioned in the lab instructions before proceeding with the cells in this notebook . After completing the prerequisites, you should have: 
- ARN of your private workforce who will perform human annotation
- Labeling portal sign-in URL for your private workforce to log in

In [None]:
%pip install --upgrade setuptools  aws-sam-cli

#### Setup AWS SAM 

Restart your kernel for this notebook to use updated packages 

In [None]:
%%html

<p><b>Restart your kernel for this notebook to use update package.</b></p>
<button class="sm-command-button" data-commandlinker-command="kernelmenu:restart" style="display:none;">Restart Kernel</button>

<script>
try {
    els = document.getElementsByClassName("sm-command-button");
    els[0].click();
}
catch(err) {
    // NoOp
}    
</script>

#### Setup SageMaker environment and variables

In [None]:
import os, sys, json, sagemaker,  pandas as pd, boto3, numpy as np, re
from sagemaker import get_execution_role,image_uris, model_uris, script_uris, hyperparameters
from sagemaker.tensorflow import TensorFlow

sess = sagemaker.Session()

role = get_execution_role()
region = sess.boto_session.region_name
account = sess.boto_session.client("sts").get_caller_identity()["Account"]
smclient = boto3.Session().client('sagemaker')
bucket = sess.default_bucket()
stack_name= "active-learning-stack"
key = "sagemaker-byoal-pretrained"



#### Select a pre-trained model
***
In this lab, we are using a pretrained BERT TensorFlow uncased model . For further details and other pretrained models that are available to use for transfer learning with the Text Classification - TensorFlow algorithm - please check [TensorFlow Hub Models](https://docs.aws.amazon.com/sagemaker/latest/dg/text-classification-tensorflow-Models.html) . This model is also available through Jumpstart .

A complete list of JumpStart models can also be accessed at [JumpStart Models](https://sagemaker.readthedocs.io/en/stable/doc_utils/jumpstart.html#).
***

In [None]:
model_id = "tensorflow-tc-bert-en-uncased-L-12-H-768-A-12-2"

In [None]:
model_id, model_version = model_id, "*"
training_instance_type="ml.p3.2xlarge"
inference_instance_typ="ml.m5.xlarge"

# Retrieve the docker image
train_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    model_id=model_id,
    model_version=model_version,
    image_scope="training",
    instance_type=training_instance_type,
).replace("/", "\/")

# Retrieve the training script
train_source_uri = script_uris.retrieve(
    model_id=model_id, model_version=model_version, script_scope="training"
).replace("/", "\/")


# Retrieve the pre-trained model tarball to further fine-tune
base_model_uri = model_uris.retrieve(
    model_id=model_id, model_version=model_version, model_scope="training"
).replace("/", "\/")

inference_instance_type = "ml.m5.xlarge"

# Retrieve the inference docker container uri.
deploy_image_uri = image_uris.retrieve(
    region=None,
    framework=None,
    image_scope="inference",
    model_id=model_id,
    model_version=model_version,
    instance_type=inference_instance_type,
).replace("/", "\/")
print(deploy_image_uri)



#### Build  and deploy AWS CloudFormation template using AWS SAM

Update template.yaml . The template.yaml is leverage by AWS SAM to build out the backend infrastructure consisting of State machine, helper utilities and AWS lambda functions

Update configurations variables for Active Learning process  :

- `byom` : (Bring your own model)  . This parameter determines whether the Lambda function returns the Amazon Blazing Text Algorithm model  or your own custom model docker image.  . This is set to to `false` because we are using a pretrained TensorFlow BERT model
- `byomimage` : Your own custom model image . Set this parameter to NotApplicable because we are using a pretrained  BERT model
- `sagemaker_program` :  This parameter determines the training script to be used for training your model . In this case we are using a transfer learning script to fine tune the pretrained BERT model   . Hence, setting this parameter to the script name transfer_learning.py
- `sagemaker_submit_directory` : The directory which contains the training source code or other dependencies aside from the entry point file . Setting this parameter  to the S3 bucket name  where the training script is hosted
- `pretrain_model` : This parameter determines if the active learning process is using pretrained models or not   . This is set to to `true` since we are using a pretrained BERT model
- `pretrain_algo_train_repo` :  This parameter refers to the training image URI of our  pretrained BERT algorithm . Because we are  using a pretrained BERT model, setting this parameter value to the training image URI 
- `base_model_uri` : The URI of the pretrained BERT model which will be fine tuned with custom data. Because  we are  using a pretrained BERT model, setting this to parameter URI to the pretrained BERT model
- `pretrain_algo_inference_repo` :  This parameter refers to the inference image URI of our  pretrained BERT algorithm . Since we are  using a pretrained BERT model, setting this parameter value to the inference  image URI 


Update Batch Strategy to `Single` in the `CreateTransformJob` step of the ActiveLearning-active-learning-stack state machine.
This is because the TensorFlow algorithm used here supports  `SingleRecord` 
- `BatchStrategy `: `SingleRecord`

In [None]:
%%bash -s  "$train_image_uri"  "$train_source_uri"  "$base_model_uri"  "$deploy_image_uri"

cd src

# Update byom value to False 
sed  -i  "0,/^\([[:space:]]*byom: *\).*/s//\1"false"/"  template.yaml

#Update byomimage value to NotApplicable
sed  -i  "0,/^\([[:space:]]*byomimage: *\).*/s//\1"NotApplicable"/"   template.yaml

#Update Pretrained Model flag to True
sed  -i  "0,/^\([[:space:]]*pretrain_model: *\).*/s//\1"true"/"   template.yaml

#Update Training script for PreTrained model
sed  -i  "0,/^\([[:space:]]*sagemaker_program: *\).*/s//\1"transfer_learning.py"/"   template.yaml

#Update Pretrained algorithm training repository
sed  -i  "0,/^\([[:space:]]*pretrain_algo_train_repo: *\).*/s//\1"${1}"/"   template.yaml

#Update SageMaker submit directory 
sed  -i  "0,/^\([[:space:]]*sagemaker_submit_directory: *\).*/s//\1"${2}"/"   template.yaml

#Update based pretrained model URI
sed  -i  "0,/^\([[:space:]]*base_model_uri: *\).*/s//\1"${3}"/"   template.yaml

#Updated Pretrained  algorithm inference  repository 
sed  -i  "0,/^\([[:space:]]*pretrain_algo_inference_repo: *\).*/s//\1"${4}"/"   template.yaml

#Update BatchStrategy to SingleRecord
sed -i 's/MultiRecord/SingleRecord/g'  template.yaml

cat template.yaml



In [None]:
%%bash -s  "$region"  "$stack_name"  "$bucket"

# Change directory to src folder
cd src

#Delete SAM build folder to clean up any exisiting SAM templates
rm  -rf  .aws-sam 

#Build AWS SAM artifacts to deploy AWS Cloudformation template
sam build

#Deploy AWS SAM template
sam deploy  --region ${1}  --stack-name ${2}  --s3-bucket ${3}  --capabilities CAPABILITY_IAM CAPABILITY_AUTO_EXPAND



#### Prepare labeling input manifest file

We will create an input manifest file for our active learning workflow using the newsCorpora.csv file from the [UCI News Dataset](https://archive.ics.uci.edu/ml/datasets/News+Aggregator). This dataset contains a list of about 420,000 articles that fall into one of four categories: Business (b), Science & Technology (t), Entertainment (e) and Health & Medicine (m). We will randomly choose 10,000 articles from that file to create our dataset.

For the active learning loop to start, 20% of the data must be labeled. To quickly test the active learning component, we will include 20% (`labeled_count`) of the original labels provided in the dataset in our input manifest. We use this partially-labeled dataset as the input to the active learning loop.

In [None]:
! wget -nc https://archive.ics.uci.edu/ml/machine-learning-databases/00359/NewsAggregatorDataset.zip --no-check-certificate && unzip -o NewsAggregatorDataset.zip

In [None]:
column_names = ["TITLE", "URL", "PUBLISHER", "CATEGORY", "STORY", "HOSTNAME", "TIMESTAMP"]
manifest_file = "partially-labeled.manifest"
news_data_all = pd.read_csv("newsCorpora.csv", names=column_names, header=None, delimiter="\t")
news_data = news_data_all.sample(n=10000, random_state=42)
news_data = news_data[["TITLE", "CATEGORY"]]
news_data 

We will clean our data set using *pandas*.

In [None]:
news_data["TITLE"].replace('"', "", inplace=True, regex=True)
news_data["TITLE"].replace("[^\w\s]", "", inplace=True, regex=True)
news_data["TITLE"] = news_data["TITLE"].str.split("\n").str[0]
news_data["CATEGORY_LABELS"]=news_data["CATEGORY"]
news_data["CATEGORY"] = news_data["CATEGORY"].astype("category").cat.codes
news_data

In [None]:
fixed = news_data["TITLE"].str.lower().replace('"', "")

The following cell will create our partially-labeled input manifest file, and push it to our S3 bucket. 

In [None]:
import json

total = len(news_data)
labeled_count = int(total / 5)  # 20% of the dataset is labeled.
label_map = {
    "b": "Business",
    "e": "Entertainment",
    "m": "Health & Medicine",
    "t": "Science and Technology",
}
labeled_series = pd.Series(
    data=news_data.iloc[:labeled_count].TITLE.values,
    index=news_data.iloc[:labeled_count].CATEGORY.values,
)
annotation_metadata = b"""{ "category-metadata" : { "confidence": 1.0, "human-annotated": "yes", "type": "groundtruth/text-classification"} }"""
annotation_metadata_dict = json.loads(annotation_metadata)
with open(manifest_file, "w") as outfile:
    for items in labeled_series.items():
        labeled_record = dict()
        labeled_record["source"] = items[1]
        labeled_record["category"] = int(items[0])
        labeled_record.update(annotation_metadata_dict)
        outfile.write(json.dumps(labeled_record) + "\n")

unlabeled_series = pd.Series(
    data=news_data.iloc[labeled_count:].TITLE.values,
    index=news_data.iloc[labeled_count:].CATEGORY.values,
)
with open(manifest_file, "a") as outfile:
    for items in unlabeled_series.items():
        outfile.write('{"source":"' + items[1] + '"}\n')

boto3.resource("s3").Bucket(bucket).upload_file(manifest_file, key + "/" + manifest_file)
manifest_file_uri = "s3://{}/{}".format(bucket, key + "/" + manifest_file)

In [None]:
# Use s3 client to upload relevant json strings to s3.
s3_client = boto3.client("s3")

This cell will specify the labels that workers will use to categorize the articles. To customize your labeling job, add your own labels here. To learn more, see [LabelCategoryConfigS3Uri](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateLabelingJob.html#sagemaker-CreateLabelingJob-request-LabelCategoryConfigS3Uri).

In [None]:
label_file_name = "class_labels.json"
label_file = """{
    "document-version": "2018-11-28",
    "labels": [
        {
            "label": "Business"
        },
        {
            "label": "Entertainment"
        },
        {
            "label": "Health & Medicine"
        },
        {
            "label": "Science and Technology"
        }
    ]
}"""

s3_client.put_object(Body=label_file, Bucket=bucket, Key=key + "/" + label_file_name)
label_file_uri = "s3://{}/{}".format(bucket, key + "/" + label_file_name)

####  Prepare custom worker task template 

The following cell will specify our custom worker task template. This template will configure the UI that workers will see when they open our text classification labeling job tasks. To learn how to customize this cell, see  [Creating your custom labeling task template](https://docs.aws.amazon.com/sagemaker/latest/dg/sms-custom-templates-step2.html).

In [None]:
template_file_name = "instructions.template"
template_file = r"""
<script src="https://assets.crowd.aws/crowd-html-elements.js"></script>
<crowd-form>
  <crowd-classifier
    name="crowd-classifier"
    categories="{{ task.input.labels | to_json | escape }}"
    header="Select the news title corresponding to the 4 categories. (b) for Business, (e) for Entertainment, (m) for Health and Medicine and (t) for Science and Technology."
  >
    <classification-target> {{ task.input.taskObject }} </classification-target>
    <full-instructions header="Classifier instructions">
      <ol><li><strong>Read</strong> the text carefully.</li><li><strong>Read</strong> the examples to understand more about the options.</li><li><strong>Choose</strong> the appropriate label that best suits the text.</li></ol>
    </full-instructions>
    <short-instructions>
      <p>Example Business title:</p><p>US open: Stocks fall after Fed official hints at accelerated tapering.</p><p><br>
      </p><p>Example Entertainment title:</p><p>CBS negotiates three more seasons for The Big Bang Theory</p><p><br>
      </p><p>Example Health & Medicine title:</p><p>Blood Test Could Predict Alzheimer's. Good News? </p><p><br>
      </p><p>Example Science and Technology (t) title:</p><p>Elephants tell human friend from foe by voice.</p><p><br>
      </p>
    </short-instructions>
  </crowd-classifier>
</crowd-form>
"""

s3_client.put_object(Body=template_file, Bucket=bucket, Key=key + "/" + template_file_name)
template_file_uri = "s3://{}/{}".format(bucket, key + "/" + template_file_name)

In this lab we will be using a private work force to label your dataset . To use a private work team to labeling your data objects, set `USE_PRIVATE_WORKFORCE` to `True` and input your work team ARN for `private_workteam_arn` that you copied from the *Prerequisite* section earlier. You must have a private workforce in the same AWS Region as your labeling job task request to use a private work team. To learn more see [Use a Private Workforce](https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-private.html)

In [None]:
USE_PRIVATE_WORKFORCE = True
private_workteam_arn = "Enter your Private Worker ARN"

This cell will automatically configure a public workforce ARN and pre- and post-annotation ARNs (`prehuman_arn` and `acs_arn` respectively). If `USE_PRIVATE_WORKFORCE` is `False` a public workforce will be used to create your labeling job request. 

To customize your labeling job task type, you will need to modify `prehuman_arn` and `acs_arn`. 

If you are using one of the Ground Truth built-in task types, you can find pre- and post-annotation lambda ARNs using the following links. 
* Pre-annotation lambda ARNs for built in task types can be found in [HumanTaskConfig](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_HumanTaskConfig.html#API_HumanTaskConfig_Contents).
* Post-annotation lambda ARNs (Annotation Consolidation Lambda) for built in task types can be found in [AnnotationConsolidationConfig](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_AnnotationConsolidationConfig.html#sagemaker-Type-AnnotationConsolidationConfig-AnnotationConsolidationLambdaArn).

If you are creating a custom labeling job task, see [Step 3: Processing with AWS Lambda
](https://docs.aws.amazon.com/sagemaker/latest/dg/sms-custom-templates-step3.html) learn how to create custom pre- and post-annotation lambda ARNs.

In [None]:
# Specify ARNs for resources needed to run a text classification job.
ac_arn_map = {
    "us-west-2": "081040173940",
    "us-east-1": "432418664414",
    "us-east-2": "266458841044",
    "eu-west-1": "568282634449",
    "ap-northeast-1": "477331159723",
}

public_workteam_arn = "arn:aws:sagemaker:{}:394669845002:workteam/public-crowd/default".format(
    region
)
prehuman_arn = "arn:aws:lambda:{}:{}:function:PRE-TextMultiClass".format(region, ac_arn_map[region])
acs_arn = "arn:aws:lambda:{}:{}:function:ACS-TextMultiClass".format(region, ac_arn_map[region])

The following cell specifies our labeling job name, the description workers see, and tags that workers can use to find our labeling job task.

In [None]:
job_name_prefix = "byoal-news"
task_description = "Classify news title to one of these 4 categories."
task_keywords = ["text", "classification", "humans", "news"]
task_title = task_description

#### Labeling job request

Modify the following request to customize your labeling job request. For more information on the parameters below, see [CreateLabelingJob](https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_CreateLabelingJob.html).

In [None]:
human_task_config = {
    "AnnotationConsolidationConfig": {
        "AnnotationConsolidationLambdaArn": acs_arn,
    },
    "PreHumanTaskLambdaArn": prehuman_arn,
    "MaxConcurrentTaskCount": 50,  # 50 texts will be sent at a time to the workteam.
    "NumberOfHumanWorkersPerDataObject": 1,  # 1 workers will be enough to label each text.
    "TaskAvailabilityLifetimeInSeconds": 21600,  # Your work team has 6 hours to complete all pending tasks.
    "TaskDescription": task_description,
    "TaskKeywords": task_keywords,
    "TaskTimeLimitInSeconds": 300,  # Each text must be labeled within 5 minutes.
    "TaskTitle": task_title,
    "UiConfig": {
        "UiTemplateS3Uri": template_file_uri,
    },
}

if not USE_PRIVATE_WORKFORCE:
    human_task_config["PublicWorkforceTaskPrice"] = {
        "AmountInUsd": {
            "Dollars": 0,
            "Cents": 1,
            "TenthFractionsOfACent": 2,
        }
    }
    human_task_config["WorkteamArn"] = public_workteam_arn
else:
    human_task_config["WorkteamArn"] = private_workteam_arn

ground_truth_request = {
    "InputConfig": {
        "DataSource": {
            "S3DataSource": {
                "ManifestS3Uri": manifest_file_uri,
            }
        },
        "DataAttributes": {
            "ContentClassifiers": ["FreeOfPersonallyIdentifiableInformation", "FreeOfAdultContent"]
        },
    },
    "OutputConfig": {
        "S3OutputPath": "s3://{}/{}/output/".format(bucket, key),
    },
    "HumanTaskConfig": human_task_config,
    "LabelingJobNamePrefix": job_name_prefix,
    "RoleArn": role,
    "LabelAttributeName": "category",
    "LabelCategoryConfigS3Uri": label_file_uri,
}

In [None]:
print(json.dumps(ground_truth_request, indent=2))

step_func_input=json.dumps(ground_truth_request,default=str)

In [None]:
#Get Stepfunction Client and start execution
sfn=boto3.client('stepfunctions', region_name=region)
stateMachineArn="arn:aws:states:{}:{}:stateMachine:ActiveLearningLoop-active-learning-stack".format(region,account)
response = sfn.start_execution(
    stateMachineArn=stateMachineArn,
    input=step_func_input
    
)

executionArn=response["executionArn"] 

print(json.dumps(response, indent=2, default=str))

Please refer to lab instructions for next steps on View active data labeling workflow and Human Annotation WorkFlow

On successful completion of the active learning loop and the Human Annotation Worflow, the state machine will output the final output manifest file and the latest trained model output.

Run the below cell, on succesfull completion of the active learning loop and the Human Annotation Worflow

In [None]:
# Clean up 

# Stop Active Learning state machine
if (sfn.describe_execution(executionArn=executionArn)["status"]=="RUNNING" ):
    sfn.stop_execution(executionArn=executionArn)
    print('State Machine Execution Stopped :  ' + executionArn)

# Stop Ground Truth Labeling jobs that are InProgress or Initializing
response = smclient .list_labeling_jobs(NameContains='byoal-news')
for i in range(len(response['LabelingJobSummaryList'])):
    if ( (response['LabelingJobSummaryList'][i]['LabelingJobStatus'] == 'InProgress' ) or  (response['LabelingJobSummaryList'][i]['LabelingJobStatus'] == 'Initializing')):
        label_job_name=response['LabelingJobSummaryList'][i]['LabelingJobName']
        smclient.stop_labeling_job(LabelingJobName=label_job_name)
        print('Stopping Labeling Job:  ' + label_job_name )

Delete AWS Cloudformation Stack created by AWS SAM

In [None]:
%%bash -s  "$region"  "$stack_name"

#Delete CloudFormation Stack 
sam delete  --region ${1}  --stack-name ${2}  --no-prompts