AWSTemplateFormatVersion: '2010-09-09' Description: AWS CloudFormation to deploy SageMaker annotations and modeling resources Parameters: CloudFrontEndpoint: Type: String Description: The Endpoint URL of PACS used by PreLabel Lambda function that will be used to construct the source of DICOM image NotebookInstanceName: Type: String Default: dicomannotationbookinstance NotebookInstanceType: Type: String Default: ml.m5.xlarge AllowedValues: - ml.c4.2xlarge - ml.c4.4xlarge - ml.c4.8xlarge - ml.c4.xlarge - ml.c5.18xlarge - ml.c5.2xlarge - ml.c5.4xlarge - ml.c5.9xlarge - ml.c5.xlarge - ml.c5d.18xlarge - ml.c5d.2xlarge - ml.c5d.4xlarge - ml.c5d.9xlarge - ml.c5d.xlarge - ml.m4.10xlarge - ml.m4.16xlarge - ml.m4.2xlarge - ml.m4.4xlarge - ml.m4.xlarge - ml.m5.12xlarge - ml.m5.24xlarge - ml.m5.2xlarge - ml.m5.4xlarge - ml.m5.xlarge - ml.p2.16xlarge - ml.p2.8xlarge - ml.p2.xlarge - ml.p3.16xlarge - ml.p3.2xlarge - ml.p3.8xlarge - ml.t2.2xlarge - ml.t2.large - ml.t2.medium - ml.t2.xlarge - ml.t3.2xlarge - ml.t3.large - ml.t3.medium - ml.t3.xlarge Resources: AnnotationS3Bucket: Type: 'AWS::S3::Bucket' Properties: BucketName: !Sub sagemaker-groundtruth-label-dicom-${AWS::AccountId} BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 VersioningConfiguration: Status: Enabled SMGTLabelingExecutionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub SMGTLabelingExecutionRole-${AWS::AccountId} AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - sagemaker.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/AmazonSageMakerFullAccess - arn:aws:iam::aws:policy/AWSLambda_FullAccess Path: / ConsolidationLambdaSMGTExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole Path: / PreLabelingLambdaSMGTExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess Path: / PreLabelTaskLambda: Type: AWS::Lambda::Function DependsOn: - PreLabelingLambdaSMGTExecutionRole Properties: Code: ZipFile: | import json import os import base64 from urllib.parse import urlparse import boto3 import logging logger = logging.getLogger() logger.setLevel(logging.INFO) def lambda_handler(event, context): logger.debug(json.dumps(event, indent=2)) source = event['dataObject']['source'] if "source" in event['dataObject'] else None source_ref = event['dataObject']['source-ref'] if "source-ref" in event['dataObject'] else None labels = event['dataObject']['labels'] if "labels" in event['dataObject'] else None task_object = source if source is not None else source_ref task_object = '/'.join([os.environ['EndpointURL'], 'instances', task_object, 'file']) # Build response object output = { "taskInput": { "taskObject": task_object }, "isHumanAnnotationRequired": "true" } if labels is not None: output['taskInput']['labels'] = labels logger.info(json.dumps(output, indent=2)) # If neither source nor source-ref specified, mark the annotation failed if task_object is None: print(" Failed to pre-process {} !".format(event["labelingJobArn"])) output["isHumanAnnotationRequired"] = "false" return output Role: !GetAtt 'PreLabelingLambdaSMGTExecutionRole.Arn' FunctionName: gt-prelabel-task-lambda Timeout: 60 Handler: index.lambda_handler Runtime: python3.8 MemorySize: 128 Environment: Variables: EndpointURL: !Ref CloudFrontEndpoint PostLabelTaskLambda: Type: AWS::Lambda::Function DependsOn: - ConsolidationLambdaSMGTExecutionRole Properties: Code: ZipFile: | import json import sys from s3_helper import S3Client def lambda_handler(event, context): print(json.dumps(event, indent=2)) labeling_job_arn = event["labelingJobArn"] label_attribute_name = event["labelAttributeName"] label_categories = None if "label_categories" in event: label_categories = event["labelCategories"] print(" Label Categories are : " + label_categories) payload = event["payload"] role_arn = event["roleArn"] output_config = None # Output s3 location. You can choose to write your annotation to this location if "outputConfig" in event: output_config = event["outputConfig"] kms_key_id = None if "kmsKeyId" in event: kms_key_id = event["kmsKeyId"] s3_client = S3Client(role_arn, kms_key_id) return do_consolidation(labeling_job_arn, payload, label_attribute_name, s3_client) def do_consolidation(labeling_job_arn, payload, label_attribute_name, s3_client): if "s3Uri" in payload: s3_ref = payload["s3Uri"] payload = json.loads(s3_client.get_object_from_s3(s3_ref)) print(payload) consolidated_output = [] success_count = 0 # Number of data objects that were successfully consolidated failure_count = 0 # Number of data objects that failed in consolidation for p in range(len(payload)): response = None try: dataset_object_id = payload[p]['datasetObjectId'] log_prefix = "[{}] data object id [{}] :".format(labeling_job_arn, dataset_object_id) print("{} Consolidating annotations BEGIN ".format(log_prefix)) annotations = payload[p]['annotations'] print("{} Received Annotations from all workers {}".format(log_prefix, annotations)) for i in range(len(annotations)): worker_id = annotations[i]["workerId"] annotation_content = annotations[i]['annotationData'].get('content') annotation_s3_uri = annotations[i]['annotationData'].get('s3uri') annotation = annotation_content if annotation_s3_uri is None else s3_client.get_object_from_s3( annotation_s3_uri) annotation_from_single_worker = json.loads(annotation) print("{} Received Annotations from worker [{}] is [{}]" .format(log_prefix, worker_id, annotation_from_single_worker)) consolidated_annotation = {"annotationsFromAllWorkers": annotations} # TODO : Add your consolidation logic response = { "datasetObjectId": dataset_object_id, "consolidatedAnnotation": { "content": { label_attribute_name: consolidated_annotation } } } success_count += 1 print("{} Consolidating annotations END ".format(log_prefix)) if response is not None: consolidated_output.append(response) except: failure_count += 1 print(" Consolidation failed for dataobject {}".format(p)) print(" Unexpected error: Consolidation failed." + str(sys.exc_info()[0])) print("Consolidation Complete. Success Count {} Failure Count {}".format(success_count, failure_count)) print(" -- Consolidated Output -- ") print(consolidated_output) print(" ------------------------- ") return consolidated_output Role: !GetAtt 'ConsolidationLambdaSMGTExecutionRole.Arn' FunctionName: gt-postlabel-task-lambda Timeout: 60 Handler: index.lambda_handler Runtime: python3.8 MemorySize: 128 SageMakerAPIExecutionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub SageMakerAPIExecutionRoleName-${AWS::AccountId} AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: sagemaker.amazonaws.com Action: sts:AssumeRole Path: / Policies: - PolicyName: !Sub SageMakerAPIExecutionRolePolicyName-${AWS::AccountId} PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - sagemaker:CreateModel - sagemaker:DeleteEndpointConfig - sagemaker:DeleteEndpoint - sagemaker:DeleteModel - sagemaker:CreateEndpoint - sagemaker:UpdateEndpoint - sagemaker:StopNotebookInstance - sagemaker:StartNotebookInstance - sagemaker:CreateHyperParameterTuningJob - sagemaker:DescribeHyperParameterTuningJob - s3:CreateBucket - s3:PutObject - s3:GetObject - s3:ListBucket - sts:AssumeRole - iam:PassRole - iam:GetRole - sagemaker:DescribeEndpointConfig - sagemaker:CreateEndpointConfig - sageMaker:CreateEndpoint - sagemaker:DescribeEndpoint - sagemaker:InvokeEndpoint - sagemaker:CreateTrainingJob - sagemaker:DescribeTrainingJob - ecr:BatchGetImage - ecr:SetRepositoryPolicy - ecr:CompleteLayerUpload - ecr:BatchDeleteImage - ecr:UploadLayerPart - ecr:DeleteRepositoryPolicy - ecr:InitiateLayerUpload - ecr:DeleteRepository - ecr:PutImage - ecr:BatchCheckLayerAvailability - ecr:CreateRepository - ecr:Describe* - ecr:GetAuthorizationToken - ecr:GetDownloadUrlForLayer - ecr:StartImageScan - logs:CreateLogDelivery - logs:CreateLogGroup - logs:CreateLogStream - logs:DeleteLogDelivery - logs:Describe* - logs:GetLogDelivery - logs:GetLogEvents - logs:ListLogDeliveries - logs:PutLogEvents - logs:PutResourcePolicy - logs:UpdateLogDelivery Resource: "*" SageMakerNotebookInstance: Type: AWS::SageMaker::NotebookInstance Properties: NotebookInstanceName: !Ref NotebookInstanceName InstanceType: !Ref NotebookInstanceType RoleArn: !GetAtt SageMakerAPIExecutionRole.Arn VolumeSizeInGB: 50 DefaultCodeRepository: https://github.com/aws-samples/annotate-medical-images-in-dicom-server-and-build-ml-models-on-amazon-sagemaker.git Outputs: SMGTLabelingExecutionRole: Description: SageMaker GroundTruth Labeling IAM Role Value: !GetAtt 'SMGTLabelingExecutionRole.Arn' SageMakerNotebookInstance: Value: !Ref SageMakerNotebookInstance SageMakerAnnotationS3Bucket: Value: !Ref AnnotationS3Bucket