{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Video Moderation with Human-in-the-loop\n",
"\n",
"Amazon Augmented AI (Amazon A2I) makes it easy to build the workflows required for human review of ML predictions. Amazon A2I brings human review to all developers, removing the undifferentiated heavy lifting associated with building human review systems or managing large numbers of human reviewers. \n",
"\n",
"Amazon A2I allows you to create your workflows for ML models built on Amazon SageMaker or any other customized workflow. Using Amazon A2I, you can allow human reviewers to step in when a model cannot make a high-confidence prediction or audit its predictions on an ongoing basis. Learn more [here](https://aws.amazon.com/augmented-ai/).\n",
"\n",
"In this tutorial, we will show how you can use Amazon A2I to customize a UI template and use it to review Rekognition's Video moderation result. \n",
"\n",
"For more in depth instructions, visit this [website](https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-getting-started.html).\n",
"\n",
"\n",
"\n",
"- [Step 1: Setup Notebook](#step1)\n",
"- [Step 2: Moderate video using Rekognition video moderation API](#step2)\n",
"- [Step 3: Set up A2I for human review](#step3)\n",
" - [Step 3.1: Create Work Team](#step31)\n",
" - [Step 3.2: Create Human Task UI](#step32)\n",
" - [Step 3.3: Creating the Flow Definition](#step33)\n",
"- [Step 4: Moderate image with human-in-the-loop](#step4)\n",
"- [Step 5: Check A2I generated result in S3](#step5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Step 1: Setup Notebook \n",
"Run the below cell to install/update Python dependencies if you run the lab using a local IDE. It is optional if you use a SageMaker Studio Juypter Notebook, which already includes the dependencies in the kernel. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Install/upgrade dependencies\n",
"%pip install -qU pip\n",
"%pip install boto3 -qU\n",
"%pip install IPython -qU"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import boto3\n",
"import botocore\n",
"import sagemaker as sm\n",
"import os\n",
"import datetime\n",
"from IPython.display import Video\n",
"from IPython.display import HTML, display\n",
"import uuid\n",
"import json\n",
"import time\n",
"\n",
"# variables\n",
"data_bucket = sm.Session().default_bucket()\n",
"region = boto3.session.Session().region_name\n",
"\n",
"os.environ[\"BUCKET\"] = data_bucket\n",
"os.environ[\"REGION\"] = region\n",
"role = sm.get_execution_role()\n",
"\n",
"print(f\"SageMaker role is: {role}\\nDefault SageMaker Bucket: s3://{data_bucket}\")\n",
"\n",
"s3=boto3.client('s3', region_name=region)\n",
"rekognition=boto3.client('rekognition', region_name=region)\n",
"sagemaker = boto3.client('sagemaker', region_name=region)\n",
"a2i_runtime_client = boto3.client('sagemaker-a2i-runtime', region_name=region)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"For this lab, we will send a moderation demo video located at datasets/moderation-video.mp4 to Rekognition. Run the below cell to preview the video."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"Video('../datasets/moderation-video.mp4')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Now, let's upload the video to the default S3 bucket for Rekognition to access."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"s3_key = 'content-moderation-im/video-moderation/moderation-video.mp4'\n",
"s3.upload_file('../datasets/moderation-video.mp4', data_bucket, s3_key)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Step 2: Moderate video using Rekognition moderation API \n",
"Call Rekognition Content Moderation API to detect inappropriate information in the video. Rekognition Video moderation API is an asynchronize API that will start a job managed by Rekognition. We will receive a job_id back when we start the job. Then use iteration logic to heart-beat the get_content_moderation API to check job status until the job completes."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"startModerationLabelDetection = rekognition.start_content_moderation(\n",
" Video={\n",
" 'S3Object': {\n",
" 'Bucket': data_bucket,\n",
" 'Name': s3_key,\n",
" }\n",
" },\n",
" #MinConfidence=min_confidence,\n",
")\n",
"\n",
"moderationJobId = startModerationLabelDetection['JobId']\n",
"display(\"Job Id: {0}\".format(moderationJobId))\n",
"\n",
"getContentModeration = rekognition.get_content_moderation(\n",
" JobId=moderationJobId,\n",
" SortBy='TIMESTAMP'\n",
")\n",
"\n",
"while(getContentModeration['JobStatus'] == 'IN_PROGRESS'):\n",
" time.sleep(5)\n",
" print('.', end='')\n",
"\n",
" getContentModeration = rekognition.get_content_moderation(\n",
" JobId=moderationJobId,\n",
" SortBy='TIMESTAMP')\n",
"\n",
"display(getContentModeration['JobStatus'])"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"getContentModeration"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As we can see in the Rekognition response, the get_content_moderation API returns a list of labels under the \"ModerationLabels\" field, a list of items each contains:\n",
"- Timestamp (millisecond): when inapproperiate information found within the video\n",
"- ModerationLabel:\n",
" - Confidence: Confidence score of the label\n",
" - Name: second level moderation category name\n",
" - ParentName: top-level moderation category name\n",
"\n",
"There are multiple items returned from Rekognition, and hard to review this result by looking at the data directly. In the next step, we will set up A2I so the human reviewers can check these results quickly."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Step 3: Set up A2I for human review "
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"#### Setup Bucket CORS\n",
"\n",
"**Important**: The bucket you specify for `data_bucket` must have CORS enabled. You can enable CORS by adding a policy similar to the following to your Amazon S3 bucket. To learn how to add CORS to an S3 bucket, see [CORS Permission Requirement](https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-permissions-security.html#a2i-cors-update) in the Amazon A2I documentation. \n",
"\n",
"\n",
"```\n",
"[{\n",
" \"AllowedHeaders\": [],\n",
" \"AllowedMethods\": [\"GET\"],\n",
" \"AllowedOrigins\": [\"*\"],\n",
" \"ExposeHeaders\": []\n",
"}]\n",
"```\n",
"\n",
"If you do not add a CORS configuration to the S3 buckets that contains your image input data, human review tasks for those input data objects will fail. \n",
"\n",
"Print out the default S3 bucket name using the below cell if you don't know which bucket is in use."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"print('The default SageMaker Studio S3 bucket name: ' + data_bucket)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Run the below cell to enable CORS to the default S3 bucket"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"cors_configuration = {\n",
" 'CORSRules': [{\n",
" \"AllowedHeaders\": [],\n",
" \"AllowedMethods\": [\"GET\"],\n",
" \"AllowedOrigins\": [\"*\"],\n",
" \"ExposeHeaders\": []\n",
" }]\n",
"}\n",
"\n",
"s3.put_bucket_cors(Bucket=data_bucket,\n",
" CORSConfiguration=cors_configuration)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"You can execute all the cells in Step 3 to provision A2I components programmatically. Or you can follow the [instruction in the Content Moderation Workshop](https://catalog.workshops.aws/content-moderation/02-video-moderation/02-video-moderation-with-a2i) to set up them using the AWS console if you have already done the setups through the console. You can copy/paste the workflow ARN to the below cell and run it, then proceed to Step 4."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Copy/paste the workflow ARN to set the below variable if you have already done A2I setup using the AWS console\n",
"workflow_definition_arn = ''"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 3.1: Create Work Team \n",
" A workforce is the group of workers that you have selected to label your dataset. You can choose either the Amazon Mechanical Turk workforce, a vendor-managed workforce, or you can create your own private workforce for human reviews. Whichever workforce type you choose, Amazon Augmented AI takes care of sending tasks to workers. When you use a private workforce, you also create work teams, a group of workers from your workforce that are assigned to Amazon Augmented AI human review tasks. You can have multiple work teams and can assign one or more work teams to each job. \n",
"\n",
"This lab assumes that you already have the workforce team in the same region. If you don't have a workforce team in the current region, follow [Step 1](https://catalog.us-east-1.prod.workshops.aws/workshops/1ece9ffd-4c24-4e66-b42a-0c0e13b0f668/en-US/content-moderation/02-video-moderation/02-video-moderation-with-a2i#step-1:-create-a-private-team-in-aws-console-(you-can-skip-this-step-if-you-already-have-a-private-work-team-in-the-region)) and [Step 2](https://catalog.us-east-1.prod.workshops.aws/workshops/1ece9ffd-4c24-4e66-b42a-0c0e13b0f668/en-US/content-moderation/02-video-moderation/02-video-moderation-with-a2i#step-2:-activate-a2i-user-account) on the Workshop website to set it up in the AWS console.\n",
"\n",
"[More documents](https://docs.aws.amazon.com/sagemaker/latest/dg/sms-workforce-management.html ) about how to set up a workforce team for A2I."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# get the existing workforce arn\n",
"work_team_arn = sagemaker.list_workteams()[\"Workteams\"][0][\"WorkteamArn\"]\n",
"work_team_arn"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 3.2: Create Human Task UI \n",
"Create a human task UI resource, giving a UI template in liquid HTML. This template will be rendered to the human workers whenever a human loop is required.\n",
"We will use a customized UI template that is compatible with the dictionary data will prepare in the following steps.\n",
"The customized UI template locates in the same folder: a2i-video-moderation-custom-template.html. It is a static web page with HTML, CSS and JavaScripts. You can open the file and check the detailed logic there."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# read the UI template from the HTML file\n",
"template = \"\"\n",
"with open('a2i-video-moderation-custom-template.html','r') as f:\n",
" template = f.read()\n",
"\n",
"human_task_name = \"a2i-rekognition-video-moderation-custom-ui\"\n",
"\n",
"# Create A2I UI template\n",
"resp = sagemaker.create_human_task_ui(\n",
" HumanTaskUiName=human_task_name,\n",
" UiTemplate={'Content': template})"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Keep the new UI template ARN in a variable\n",
"ui_template_arn = resp[\"HumanTaskUiArn\"]\n",
"ui_template_arn"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"## Step 3.3: Creating the Flow Definition \n",
"In this section, we're going to create a flow definition. Flow Definitions allow us to specify:\n",
"- The workforce that your tasks will be sent to. \n",
"- The instructions that your workforce will receive. This is called a worker task template. \n",
"- The configuration of your worker tasks, including the number of workers that receive a task and time limits to complete tasks. Where your output data will be stored.\n",
"\n",
"This demo is going to use the API, but you can optionally create this workflow definition in the console as well. For more details and instructions, see [this document](https://docs.aws.amazon.com/sagemaker/latest/dg/a2i-create-flow-definition.html)."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"flow_def_name = \"content-moderation-im-video-moderation-custom-ui-workflow\"\n",
"\n",
"resp = sagemaker.create_flow_definition(\n",
" FlowDefinitionName= flow_def_name,\n",
" RoleArn= role,\n",
" HumanLoopConfig= {\n",
" \"WorkteamArn\": work_team_arn,\n",
" \"HumanTaskUiArn\": ui_template_arn,\n",
" \"TaskCount\": 1,\n",
" \"TaskDescription\": \"A2I Rekognition video moderation workflow\",\n",
" \"TaskTitle\": \"Content Moderation Video Human Loop Task\"\n",
" },\n",
" OutputConfig={\n",
" \"S3OutputPath\" : f's3://{data_bucket}/a2i/output/'\n",
" }\n",
" )\n",
"\n",
"workflow_definition_arn = resp['FlowDefinitionArn']\n",
"workflow_definition_arn"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Step 4: Moderate video with human review \n",
"\n",
"The A2I custom UI template allows us to decide when to trigger A2I. It allows us to customize the business rule to support more complex business requirements. In the below code, we will start A2I only when the moderation top category is `Alcoholic Beverages` or `Barechested Male` or `Drinking` with a confidence score greater than 50%."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# A function to prepare the data in the format the custom UI expects\n",
"# This function will only include the labels in \"Alcoholic Beverages\",\"Barechested Male\" or \"Drinking\" and confidence > 80%\n",
"def generate_a2i_payload(rek_response):\n",
" failed_conditions = []\n",
" for item in rek_response[\"ModerationLabels\"]:\n",
" if item[\"ModerationLabel\"][\"Name\"] in [\"Alcoholic Beverages\",\"Barechested Male\", \"Drinking\"] and item[\"ModerationLabel\"][\"Confidence\"] > 80:\n",
" failed_conditions.append(item)\n",
" \n",
" return {\n",
" \"InputContent\": json.dumps({\n",
" \"Results\": {\n",
" \"ConditionMissed\": failed_conditions,\n",
" },\n",
" \"s3\":{\n",
" \"url\": f's3://{data_bucket}/{s3_key}'\n",
" },\n",
" \"VideoName\": \"Tear of Steel Trailer\",\n",
" })\n",
" }"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# the data will send to A2I\n",
"a2i_payload = generate_a2i_payload(getContentModeration)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"display(a2i_payload)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"As you can see, the function filtered the moderation labels and only kept 2 of them in the list. We will send this list of moderated labels to A2I for human review. \n",
"\n",
"We now start an A2I human review task by sending this dataset and the other information to the API."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"human_loop_name = f'cm-im-video-{str(uuid.uuid4())}'\n",
"\n",
"start_loop_response = a2i_runtime_client.start_human_loop(\n",
" HumanLoopName=human_loop_name,\n",
" FlowDefinitionArn=workflow_definition_arn,\n",
" HumanLoopInput=a2i_payload)\n",
"human_loop_arn = start_loop_response[\"HumanLoopArn\"]\n"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Check Human Loop status"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"a2i_response = a2i_runtime_client.describe_human_loop(HumanLoopName=human_loop_name)\n",
"print(f'\\nHuman Loop Name: {human_loop_name}')\n",
"print(f'Human Loop Status: {a2i_response[\"HumanLoopStatus\"]}')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"A human loop task is now sent to A2I and ready for us to review. You can find the A2I console URL from SageMaker console under Augment AI -> Human review workforces under the \"private\" tab. \n",
"\n",
"Or use the below code to print out the A2I console URL:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"workteamName = work_team_arn[work_team_arn.rfind('/') + 1:]\n",
"print(\"Navigate to the private worker portal and do the tasks. Make sure you've invited yourself to your workteam!\")\n",
"print('https://' + sagemaker.describe_workteam(WorkteamName=workteamName)['Workteam']['SubDomain'])"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"Login to the A2I console using the username/password you received earlier when setting up the Workforce team. You should see the task \"Content Moderation Video Human Loop Task\" in the list. Choose the item and click on the \"start working\" button on the top right to start viewing the moderation result. You will see a review page like the below screenshot:\n",
"\n",
"\n",
"\n",
"This UI is rendered using the customized template we imported in the previous steps. You can modify the HTML to change the layout or UI logic of the page. \n",
"\n",
"As a reviewer, you can review the moderation categories, then click on the \"Submit\" button. A2I will store the reviewer input in the S3 output folder.\n",
"\n",
"Now, the human loop status should updated to \"completed\"."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"a2i_resp = a2i_runtime_client.describe_human_loop(HumanLoopName=human_loop_name)\n",
"a2i_resp"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Step 5 Check A2I generated JSON \n",
"Now, let's download the A2I output file and print it out:"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"s3.download_file(data_bucket, a2i_resp[\"HumanLoopOutput\"][\"OutputS3Uri\"].replace(f's3://{data_bucket}/',''), 'a2i-output-video-moderation.json')"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"The `humanAnswers` field in the JSON file contains the reviewer's input. The `inputContent` field contains the original data sent to A2I."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {
"scrolled": true
},
"outputs": [],
"source": [
"with open('a2i-output-video-moderation.json','r') as f:\n",
" print(json.dumps(json.loads(f.read()), indent=2))"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Cleanup"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Delete A2I flow definition: this step may take a few minutes if human loop tasks in the queue and not processed\n",
"sagemaker.delete_flow_definition(FlowDefinitionName=flow_def_name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Delete A2I custom human task UI\n",
"sagemaker.delete_human_task_ui(HumanTaskUiName=human_task_name)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Delete sample video from S3 bucket\n",
"s3.delete_object(Bucket=data_bucket, Key=s3_key)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Conclusion\n",
"In this lab, we moderated a video using Rekognition Video Moderation API. And set up a customized A2I UI with customized rules to review the moderation result."
]
}
],
"metadata": {
"instance_type": "ml.t3.medium",
"kernelspec": {
"display_name": "Python 3 (Data Science)",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.7.10"
},
"vscode": {
"interpreter": {
"hash": "aee8b7b246df8f9039afb4144a1f6fd8d2ca17a180786b69acc140d282b71a49"
}
}
},
"nbformat": 4,
"nbformat_minor": 4
}