{
"cells": [
{
"cell_type": "markdown",
"metadata": {},
"source": [
"# Identify Worker Labeling Efficiency using SageMaker GroundTruth"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"---\n",
"\n",
"This notebook's CI test result for us-west-2 is as follows. CI test results in other regions can be found at the end of the notebook. \n",
"\n",
"\n",
"\n",
"---"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Introduction\n",
"Welcome to our example on identifying worker labeling efficiency for a SageMaker GroundTruth Labeling job. Before running this notebook, please make sure that all the instructions prior to the section 'Setup the Automated Accuracy Logic' from this [blog](https://aws.amazon.com/blogs/machine-learning/identifying-worker-labeling-efficiency-using-amazon-sagemaker-ground-truth/) have been followed. \n",
"\n",
"We first walk through some permissions and utility methods required for setting up. We then follow up with the logic of extracting individual worker responses and annotations from the output manifest and golden manifest to automatically calculate worker accuracy."
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Import the necessary libraries"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import json\n",
"import pprint\n",
"import boto3\n",
"from datetime import datetime\n",
"\n",
"%matplotlib inline\n",
"from urllib.parse import urlparse\n",
"import pandas as pd\n",
"import numpy as np\n",
"import sagemaker\n",
"from sagemaker import get_execution_role"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Permissions\n",
"\n",
"As a sanity check, let us verify that this notebook is in the same region as the s3 bucket setup earlier following the instructions in the blog. "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def run_sanity_check(bucket_name):\n",
" role = get_execution_role()\n",
" region = boto3.session.Session().region_name\n",
" s3 = boto3.client(\"s3\")\n",
" bucket_region = s3.get_bucket_location(Bucket=bucket_name)[\"LocationConstraint\"]\n",
" if bucket_region is None:\n",
" bucket_region = \"us-east-1\"\n",
" assert (\n",
" bucket_region == region\n",
" ), \"Your S3 bucket {} and this notebook need to be in the same region.\".format(bucket_name)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Initialize the s3 client"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"s3_client = boto3.client(\"s3\")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Utility methods"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_bucket_keys(bucket_name, prefix):\n",
" keys = []\n",
" paginator = s3_client.get_paginator(\"list_objects\")\n",
" operation_parameters = {\"Bucket\": bucket_name, \"Prefix\": prefix}\n",
" page_iterator = paginator.paginate(**operation_parameters)\n",
" for page in page_iterator:\n",
" for obj in page[\"Contents\"]:\n",
" keys.append(obj[\"Key\"])\n",
" return keys"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_bucket_key_from_s3_uri(s3_path):\n",
" parsed_url = urlparse(s3_path, allow_fragments=False)\n",
" bucket = parsed_url.netloc\n",
" key = parsed_url.path.lstrip(\"/\")\n",
" return bucket, key"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Calculate worker accuracy\n",
"Given the worker annotation and the true annotation, we calculate the accuracy of the worker. -1 is returned if we\n",
"do not know the true annotation."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def get_worker_accuracy(worker_annotation, true_annotation):\n",
" if true_annotation is None:\n",
" return -1\n",
" if worker_annotation == true_annotation:\n",
" return 1\n",
" else:\n",
" return 0"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Extract and process responses from golden manifest file and output manifest file.\n",
"For each data object, we fetch the worker response, the golden manifest annotation (if available) and the output manifest annotation. We then calculate the worker accuracy against the output manifest annotation and the golden manifest annotation. This is stored in a dictionary indexed by worker ID."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def extract_ground_truth_file(ground_truth_file):\n",
" dict_manifest_index_gt = {}\n",
" if ground_truth_file is not None:\n",
" bucket_name, key = get_bucket_key_from_s3_uri(ground_truth_file)\n",
" content = s3_client.get_object(Bucket=bucket_name, Key=key)[\"Body\"].read().decode(\"utf-8\")\n",
" content_list = content.split(\"\\n\")\n",
" for x in content_list:\n",
" if x:\n",
" dict_ = json.loads(x)\n",
" for key_, value_ in dict_.items():\n",
" dict_manifest_index_gt[key_] = value_\n",
" return dict_manifest_index_gt"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def extract_consolidated_file(consolidated_file, labeling_job_name):\n",
" dict_manifest_index_consolidated = {}\n",
" if consolidated_file is not None:\n",
" i = 0\n",
" bucket_name, key = get_bucket_key_from_s3_uri(consolidated_file)\n",
" content = s3_client.get_object(Bucket=bucket_name, Key=key)[\"Body\"].read().decode(\"utf-8\")\n",
" content_list = content.split(\"\\n\")\n",
" if len(content_list) > 0:\n",
" metadata_name = list(json.loads(content_list[0]).keys())[-1]\n",
" for x in content_list:\n",
" if x:\n",
" dict_manifest_index_consolidated[str(i)] = json.loads(x)[metadata_name][\n",
" \"class-name\"\n",
" ]\n",
" i += 1\n",
" return dict_manifest_index_consolidated"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def process_raw_annotation(worker_response):\n",
" raw_annotation = worker_response[\"answerContent\"][\"crowd-classifier\"][\"label\"]\n",
" return raw_annotation"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def process_worker_responses(\n",
" bucket_name, all_keys_worker_response, dict_manifest_index_gt, dict_manifest_index_consolidated\n",
"):\n",
" dict_worker_response = {}\n",
" for worker_response_key in all_keys_worker_response:\n",
" if worker_response_key.endswith(\".json\"):\n",
" response_content = json.loads(\n",
" s3_client.get_object(Bucket=bucket_name, Key=worker_response_key)[\"Body\"]\n",
" .read()\n",
" .decode(\"utf-8\")\n",
" )\n",
" manifest_index = worker_response_key.split(\"/\")[-2]\n",
" list_worker_response = response_content[\"answers\"]\n",
" for worker_response in list_worker_response:\n",
" processed_annotation = process_raw_annotation(worker_response)\n",
" ground_truth_annotation = None\n",
" consolidated_annotation = None\n",
" if manifest_index in dict_manifest_index_gt:\n",
" ground_truth_annotation = dict_manifest_index_gt[manifest_index]\n",
" consolidated_annotation = dict_manifest_index_consolidated[manifest_index]\n",
" worker_accuracy_gt = get_worker_accuracy(\n",
" processed_annotation, ground_truth_annotation\n",
" )\n",
" worker_accuracy_consolidated = get_worker_accuracy(\n",
" processed_annotation, consolidated_annotation\n",
" )\n",
" worker_id = worker_response[\"workerId\"]\n",
" if worker_id not in dict_worker_response:\n",
" dict_worker_response[worker_id] = []\n",
" dict_worker_response[worker_id].append(\n",
" {\n",
" \"ManifestIndex\": manifest_index,\n",
" \"GTAccuracy\": worker_accuracy_gt,\n",
" \"ConsolidatedAccuracy\": worker_accuracy_consolidated,\n",
" }\n",
" )\n",
"\n",
" return dict_worker_response"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Calculate the worker accuracy metrics and write to the given output location \n",
"This method is the starting point to this script and invokes other methods defined above. A sample invocation is shown in the following cells."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def write_worker_metrics(\n",
" labeling_job_output_location,\n",
" labeling_job_name,\n",
" s3_output_worker_metrics,\n",
" ground_truth_file=None,\n",
"):\n",
" # read the ground truth file and form a dict\n",
" dict_manifest_index_gt = extract_ground_truth_file(ground_truth_file)\n",
" if not labeling_job_output_location.endswith(\"/\"):\n",
" labeling_job_output_location = labeling_job_output_location + \"/\"\n",
" output_manifest = (\n",
" labeling_job_output_location + labeling_job_name + \"/manifests/output/output.manifest\"\n",
" )\n",
"\n",
" # read the output manifest and form a dict\n",
" dict_manifest_index_consolidated = extract_consolidated_file(output_manifest, labeling_job_name)\n",
"\n",
" # get the raw worker annotations for each data object id\n",
" worker_response_root = (\n",
" labeling_job_output_location + labeling_job_name + \"/annotations/worker-response\"\n",
" )\n",
" bucket_name, key = get_bucket_key_from_s3_uri(worker_response_root)\n",
"\n",
" all_keys_worker_response = get_bucket_keys(bucket_name, key)\n",
"\n",
" worker_response_dict = process_worker_responses(\n",
" bucket_name,\n",
" all_keys_worker_response,\n",
" dict_manifest_index_gt,\n",
" dict_manifest_index_consolidated,\n",
" )\n",
" output_dict = {}\n",
"\n",
" for worker_id, accuracy_list in worker_response_dict.items():\n",
" list_gt_accuracy = []\n",
" list_consolidated_accuracy = []\n",
" for y_ in accuracy_list:\n",
" list_gt_accuracy.append(y_[\"GTAccuracy\"])\n",
" list_consolidated_accuracy.append(y_[\"ConsolidatedAccuracy\"])\n",
" num_data_objects_golden = len(list_gt_accuracy) - list_gt_accuracy.count(-1)\n",
" list_gt_accuracy_only_annotated = [x for x in list_gt_accuracy if x != -1]\n",
"\n",
" if len(list_gt_accuracy_only_annotated) == 0:\n",
" avg_gt_accuracy = 0\n",
" else:\n",
" avg_gt_accuracy = np.mean(list_gt_accuracy_only_annotated)\n",
"\n",
" avg_consolidated_accuracy = np.mean(list_consolidated_accuracy)\n",
" num_data_objects = len(accuracy_list)\n",
" output_dict[worker_id] = {\n",
" \"Total Number Of Objects Annotated\": num_data_objects,\n",
" \"Number Of Golden Standard Objects Annotated\": num_data_objects_golden,\n",
" \"Average Accuracy Compared To Other Workers\": avg_consolidated_accuracy,\n",
" \"Average Golden Standard Accuracy\": avg_gt_accuracy,\n",
" }\n",
"\n",
" pp = pprint.PrettyPrinter(indent=4)\n",
" pp.pprint(output_dict)\n",
" bucket_name, key = get_bucket_key_from_s3_uri(s3_output_worker_metrics)\n",
" s3_client.put_object(Body=json.dumps(output_dict), Bucket=bucket_name, Key=key)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Below is a sample invocation of the script. Please replace with your own inputs.\n",
"* **bucket**: Please provide your bucket name. Our example bucket name is 'gec-sagemaker-blog'.
\n",
"\n",
"\n",
"* **labeling_job_output_location**: Please make sure this is the same as provided while creating the labeling job. In our example this is 's3://gec-sagemaker-blog/output'.
\n",
"* **labeling_job_name**: We can get the labeling job name from the Labeling jobs section under the SageMaker console.
\n",
"\n",
"\n",
"* **golden_answers**: This is the golden manifest file. Each line is a key value pair in JSON format. The key is the line number of the data object in the input manifest file provided while creating the labeling job. The value is the true label of the data object. This is an optional parameter.
\n",
"\n",
"\n",
"* **worker_metrics_output**: This s3 path of the output JSON file storing worker metrics. This file gets generated automatically. In our example this is 's3://gec-sagemaker-blog/worker_metrics.json'. Each entry in this JSON file is of the form where `public.us-east-1.PDC` is the worker ID, the **total number of objects** annotated by this worker is 3. We know the **true labels** for 2 out of these 3 objects. The golden manifest file **need not** store the true labels of all data objects. The worker correctly annotated the 2 data objects present in the golden manifest file with respect to the true annotation. The worker agreed with other workers for all 3 data objects.\n",
"\n",
" \n",
" 'public.us-east-1.PDC': {\n",
" 'Total Number Of Objects Annotated': 3,\n",
" 'Number Of Golden Standard Objects Annotated': 2,\n",
" 'Average Golden Standard Accuracy': 1.0,\n",
" 'Average Accuracy Compared To Other Workers': 1.0,\n",
" }\n",
" "
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# Sample to invoke the script. Please replace with own inputs\n",
"bucket = \"gec-sagemaker-blog\"\n",
"bucket_root_url = \"s3://\" + bucket\n",
"golden_answers = bucket_root_url + \"/\" + \"golden.manifest\"\n",
"worker_metrics_output = bucket_root_url + \"/\" + \"worker_metrics.json\"\n",
"labeling_job_name = \"person-animal-plant\"\n",
"labeling_job_output_location = bucket_root_url"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"We first run a sanity check to ensure that the bucket setup earlier as part of the blog instructions exists in the\n",
" same region as this notebook. If the assertion fails, please cross check that the notebook is in the same region as the bucket. We then call the write_worker_metrics function with the inputs defined above."
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"run_sanity_check(bucket)\n",
"write_worker_metrics(\n",
" labeling_job_output_location, labeling_job_name, worker_metrics_output, golden_answers\n",
")"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"### Conclusion"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"In this notebook, we walked through analyzing the accuracy of workers for a completed Text Classification labeling job. We calculated the worker accuracy against the consoldiated output annotation and the golden annotation (when available). This shows us a measure of how often a worker agrees with the other workers as well as the the correctness of a worker with respect to the true annotation.\n",
"\n",
"This script is setup to calculate worker accuracy for our classification modalities. However, it is possible to use a similar logic for other Ground Truth modalities as well. We use different accuracy metrics for different modalities, so code snippet calculating the worker accuracy metric would likely need to be updated. In case of classification, it is 0 or 1 but for modalities like bounding box we could use metrics like Intersection over Union. Please note that other utility methods like processing the worker responses will also need to be updated accordingly. \n"
]
},
{
"attachments": {},
"cell_type": "markdown",
"metadata": {},
"source": [
"## Notebook CI Test Results\n",
"\n",
"This notebook was tested in multiple regions. The test results are as follows, except for us-west-2 which is shown at the top of the notebook.\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n",
"\n"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "conda_python3",
"language": "python",
"name": "conda_python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.6.5"
}
},
"nbformat": 4,
"nbformat_minor": 2
}