# Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. AWSTemplateFormatVersion: '2010-09-09' Description: > SQuAD SageMaker Ground Truth Annotator Parameters: Prefix: Type: String Description: Prefix of all resources deployed in this solution. Default: gt-hf-squad Resources: ConsolidationLambdaSMGTExecutionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub "${Prefix}-consolidation-lambda-smgt-execution-role" AssumeRolePolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": ["lambda.amazonaws.com"] }, "Action": ["sts:AssumeRole"] } ] } Policies: - PolicyName: !Sub ${Prefix}-ConsolidationLambdaSMGTExecutionRole-s3-policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - s3:GetObject - s3:GetObjectVersion - s3:PutObject Resource: !Sub "${LabelingJobInputBucket.Arn}/*" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole PrelabelingLambdaSMGTExecutionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub "${Prefix}-pre-labeling-lambda-smgt-execution-role" AssumeRolePolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": ["lambda.amazonaws.com"] }, "Action": ["sts:AssumeRole"] } ] } Policies: - PolicyName: !Sub ${Prefix}-PrelabelingLambdaSMGTExecutionRole-s3-policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - s3:GetObject - s3:GetObjectVersion Resource: !Sub "${LabelingJobInputBucket.Arn}/*" ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole SageMakerRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Action: sts:AssumeRole Principal: Service: sagemaker.amazonaws.com Effect: Allow Sid: "" RoleName: !Sub ${Prefix}-sagemaker-role ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonSageMakerGroundTruthExecution - arn:aws:iam::aws:policy/AmazonSageMakerFullAccess MaxSessionDuration: 43200 NotebookRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Action: sts:AssumeRole Principal: Service: sagemaker.amazonaws.com Effect: Allow Sid: "" RoleName: !Sub ${Prefix}-notebook-role ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonSageMakerFullAccess MaxSessionDuration: 43200 PreLabelTaskLambda: Type: AWS::Lambda::Function Properties: FunctionName: !Sub "${Prefix}-sagemaker-pre-label-task" Code: ZipFile: | # Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. import json import boto3 s3 = boto3.resource("s3") def uri_to_s3_obj(s3_uri): if not s3_uri.startswith("s3://"): # This is a local path, indicate using None return None bucket, key = s3_uri.split("s3://")[1].split("/", 1) return s3.Object(bucket, key) def fetch_s3(s3_uri): print(f"FETCH {s3_uri}") obj = uri_to_s3_obj(s3_uri) body = obj.get()["Body"] return body.read() def lambda_handler(event, context): print("Received event: ", event) source = json.loads(fetch_s3(event["dataObject"]["source"])) return { "taskInput": { "source": source, } } Handler: index.lambda_handler Runtime: python3.8 Role: !GetAtt PrelabelingLambdaSMGTExecutionRole.Arn Timeout: 3 PostLabelTaskLambda: Type: AWS::Lambda::Function Properties: FunctionName: !Sub "${Prefix}-sagemaker-post-label-task" Code: ZipFile: | # Copyright 2021 Amazon.com, Inc. or its affiliates. All Rights Reserved. import json import sys import boto3 s3 = boto3.resource("s3") def uri_to_s3_obj(s3_uri): if not s3_uri.startswith("s3://"): # This is a local path, indicate using None return None bucket, key = s3_uri.split("s3://")[1].split("/", 1) return s3.Object(bucket, key) def fetch_s3(s3_uri): print(f"FETCH {s3_uri}") obj = uri_to_s3_obj(s3_uri) body = obj.get()["Body"] return body.read() def put_s3(s3_uri, data): print(f"PUT {s3_uri}") obj = uri_to_s3_obj(s3_uri) if obj: obj.put(Body=data) return print("FAILED TO PUT") def lambda_handler(event, context): # Event received print("Received event: " + json.dumps(event, indent=2)) labeling_job_arn = event["labelingJobArn"] label_attribute_name = event["labelAttributeName"] label_categories = None if "label_categories" in event: label_categories = event["labelCategories"] print(" Label Categories are : " + label_categories) payload = event["payload"] role_arn = event["roleArn"] output_config = None # Output s3 location. You can choose to write your annotation to this location if "outputConfig" in event: output_config = event["outputConfig"].rstrip("/") # If you specified a KMS key in your labeling job, you can use the key to write # consolidated_output to s3 location specified in outputConfig. kms_key_id = None if "kmsKeyId" in event: kms_key_id = event["kmsKeyId"] # Perform consolidation return do_consolidation(labeling_job_arn, payload, label_attribute_name, output_config) def do_consolidation(labeling_job_arn, payload, label_attribute_name, output_config): # Extract payload data if "s3Uri" in payload: s3_ref = payload["s3Uri"] payload = json.loads(fetch_s3(s3_ref)) print(payload) # Payload data contains a list of data objects. # Iterate over it to consolidate annotations for individual data object. consolidated_output = [] success_count = 0 # Number of data objects that were successfully consolidated failure_count = 0 # Number of data objects that failed in consolidation for p in range(len(payload)): response = None dataset_object_id = payload[p]['datasetObjectId'] log_prefix = "[{}] data object id [{}] :".format(labeling_job_arn, dataset_object_id) print("{} Consolidating annotations BEGIN ".format(log_prefix)) annotations = payload[p]['annotations'] print("{} Received Annotations from all workers {}".format(log_prefix, annotations)) annotation_content = json.loads(annotations[0]['annotationData'].get('content'))['submission'] s3_uri = f"{output_config}/{dataset_object_id}.json" put_s3(s3_uri, annotation_content) # Build consolidation response object for an individual data object response = { "datasetObjectId": dataset_object_id, "consolidatedAnnotation": { "content": { label_attribute_name: { "s3Uri": s3_uri } } } } success_count += 1 print("{} Consolidating annotations END ".format(log_prefix)) # Append individual data object response to the list of responses. if response is not None: consolidated_output.append(response) print("Consolidation Complete. Success Count {} Failure Count {}".format(success_count, failure_count)) print(" -- Consolidated Output -- ") print(consolidated_output) print(" ------------------------- ") return consolidated_output Handler: index.lambda_handler Runtime: python3.8 Role: !GetAtt ConsolidationLambdaSMGTExecutionRole.Arn Timeout: 3 CodeNotebookLifeCycleConfig: Type: AWS::SageMaker::NotebookInstanceLifecycleConfig Properties: OnStart: - Content: Fn::Base64: !Sub | #!/usr/bin/env python3 # This script passes any required args from the environment into the notebook. # SageMaker Ground Truth requires special bucket settings, pre/post lambdas, # and a well configured execution role to work, so rather than asking a user # to generate all of this, generate in cloudformation and pass into notebook. import json import subprocess config = { "PreLabelTaskLambdaArn": "${PreLabelTaskLambda.Arn}", "PostLabelTaskLambdaArn": "${PostLabelTaskLambda.Arn}", "LabelingJobInputBucket": "${LabelingJobInputBucket}", "SageMakerRoleArn": "${SageMakerRole.Arn}", } print("Configuration variables:", config) filename = "/home/ec2-user/SageMaker/hf-gt-custom-qa.json" print(f"Writing configuration to {filename}") with open(filename, "w") as f: json.dump(config, f) try: subprocess.run("git clone https://github.com/aws-samples/hf-gt-custom-qa /home/ec2-user/SageMaker/hf-gt-custom-qa", shell=True) except Exception as e: print("Failed to clone repo - might still be private?") print(e) subprocess.run("chown -R ec2-user /home/ec2-user/SageMaker/hf-gt-custom-qa*", shell=True) CodeNotebookInstance: Type: AWS::SageMaker::NotebookInstance Properties: # TODO: Clone AWS samples when we have it ready. # DefaultCodeRepository: String InstanceType: ml.t2.large LifecycleConfigName: !GetAtt CodeNotebookLifeCycleConfig.NotebookInstanceLifecycleConfigName NotebookInstanceName: !Sub ${Prefix}-notebook PlatformIdentifier: notebook-al2-v1 RoleArn: !GetAtt NotebookRole.Arn VolumeSizeInGB: 50 LabelingJobInputBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub ${Prefix}-${AWS::AccountId}-${AWS::Region}-sagemaker-input PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true CorsConfiguration: CorsRules: - AllowedHeaders: ["*"] AllowedMethods: [GET, PUT, POST, HEAD] AllowedOrigins: ["*"] ExposedHeaders: [Etag] MaxAge: 3000 Outputs: PreLabelTaskLambdaArn: Description: "PreLabeling function ARN for use in SageMaker Ground Truth" Value: !GetAtt PreLabelTaskLambda.Arn PostLabelTaskLambdaArn: Description: "PostLabeling function ARN for use in SageMaker Ground Truth" Value: !GetAtt PostLabelTaskLambda.Arn LabelingJobBucketName: Description: "Bucket with appropriate CORS to serve as input bucket" Value: !Ref LabelingJobInputBucket SageMakerRoleArn: Description: "Role to use with create_labeling_job API in SageMaker Ground Truth" Value: !GetAtt SageMakerRole.Arn