# MIT License # # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Permission is hereby granted, free of charge, to any person obtaining # a copy of this software and associated documentation files (the # "Software"), to deal in the Software without restriction, including # without limitation the rights to use, copy, modify, merge, publish, # distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so, subject # to the following conditions: # # The above copyright notice and this permission notice shall be # included in all copies or substantial portions of the Software. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, # EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF # MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND # NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS # BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN # ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN # CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. from urllib.parse import unquote_plus import json import tarfile import boto3 from io import BytesIO import time def lambda_handler(event, context): # Create an SSM Client ssm_client = boto3.client('ssm') # Create an A2I Client a2i_client = boto3.client('sagemaker-a2i-runtime') # Get parameters from SSM comprehend_parameters = ssm_client.get_parameters(Names=['FlowDefARN-TCA2I', 'S3BucketName-TCA2I'], WithDecryption=True) for parameter in comprehend_parameters['Parameters']: if parameter['Name'] == 'FlowDefARN-TCA2I': hrw_arn = parameter['Value'] elif parameter['Name'] == 'S3BucketName-TCA2I': primary_s3_bucket = parameter['Value'] # Create an S3 Client s3_client = boto3.client('s3') # Get details of the object that was just created by Comprehend bucket = event['Records'][0]['s3']['bucket']['name'] key = event['Records'][0]['s3']['object']['key'] # Get the unique name for this file extracted_file_key = 'comprehend-output/raw/' + key.split("/")[2] + "-" + "results" # Load the .tar.gz file that generated from Comprehend Custom Entity Recognition input_tar_file = s3_client.get_object(Bucket=bucket, Key=key) input_tar_content = input_tar_file['Body'].read() # Extract the file and save the contents in the primary bucket with tarfile.open(fileobj=BytesIO(input_tar_content)) as tar: for tar_resource in tar: if (tar_resource.isfile()): inner_file_bytes = tar.extractfile(tar_resource).read() s3_client.upload_fileobj(BytesIO(inner_file_bytes), Bucket=primary_s3_bucket, Key=extracted_file_key) # Load the results generated by comprehend from the Primary Data Source Bucket custom_entities_file = s3_client.get_object(Bucket=primary_s3_bucket, Key=extracted_file_key) custom_entities_recognition_results = json.loads(custom_entities_file['Body'].read()) # Load the original text extracted using Amazon Textract file_identifier = custom_entities_recognition_results['File'] textract_results_key = 'textract-output/processed/' + file_identifier text_file_object = s3_client.get_object(Bucket=primary_s3_bucket, Key=textract_results_key) original_text_file = text_file_object['Body'].read().decode("utf-8", 'ignore') # Initialize Human Loop Input Object human_loop_input = {} human_loop_input['originalText'] = original_text_file # Add list of identified entities human_loop_input['entities'] = custom_entities_recognition_results['Entities'] # Add a list of types of entities that we need to recognize human_loop_input['labels'] = [{'label': 'device', 'shortDisplayName': 'dvc', 'fullDisplayName': 'Device'}] # Create an attribute to mark the entities that have been # already identified to save time for the Human Reviewers existing_entities = [] for entity in human_loop_input['entities']: current_entity = {} current_entity['label'] = entity['Type'].lower() current_entity['startOffset'] = entity['BeginOffset'] current_entity['endOffset'] = entity['EndOffset'] existing_entities.append(current_entity) human_loop_input['initialValue'] = existing_entities # Create a Human Loop Name human_loop_name = 'TCA2I-' + str(int(round(time.time() * 1000))) print('Starting human loop - ' + human_loop_name) response = a2i_client.start_human_loop( HumanLoopName=human_loop_name, FlowDefinitionArn=hrw_arn, HumanLoopInput={ 'InputContent': json.dumps(human_loop_input) } ) return 0