# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: MIT-0 import os import json import boto3 import logging from S3Functions import S3 from boto3.dynamodb.types import TypeDeserializer ddb = boto3.client('dynamodb') deserializer = TypeDeserializer() logger = logging.getLogger(__name__) MAX_FILES_TO_REDACT=10 def gen_list_for_map(documents: list) -> list: """ This function creates a list of dicts to be used in Step Functions Map (https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-map-state.html). Each dict within the list will contain upto 10 documents which means each step function chain under Map, will process upto 10 documents at a time. With Step Functions Map you can have upto 40 parallel executions without throttling which means you can process upto 400 documents in-parallel (10 each x 40 chains). You can fine tune the number 10 (MAX_FILES_TO_REDACT) based on the total volume of documents you intend to process at once and the size of documents. If you have larger documents, it is recommended that you use smaller number of files per chain since the redaction Lambda can be resource intensive. """ try: docs_to_redact=None if len(documents) <= 10: docs_to_redact = [documents] else: ctr = 1 docs_to_redact = [] docs = [] for idx, doc in enumerate(documents): ctr = ctr + 1 docs.append(doc) if ctr == MAX_FILES_TO_REDACT: ctr = 1 docs_to_redact.append(docs) docs = [] if idx+1 == len(documents): docs_to_redact.append(docs) return docs_to_redact except Exception as e: logger.error(e) raise e def update_error_state(env_vars,event): logger.debug('Updating de-identification status to failed') wf_update = f"UPDATE \"{env_vars['IDP_TABLE']}\" SET de_identification_status=? WHERE part_key=? AND sort_key=?" ddb.execute_statement(Statement=wf_update, Parameters=[ {'S': 'failed'}, {'S': event['workflow_id']}, {'S': f"input/{event['workflow_id']}/"} ]) def lambda_handler(event, context): log_level = os.environ.get('LOG_LEVEL', 'INFO') logger.setLevel(log_level) logger.info(json.dumps(event)) env_vars = {} for name, value in os.environ.items(): env_vars[name] = value workflow_id = event["workflow_id"] phi_output_dir = event["phi_output_dir"] bucket = event["bucket"] s3 = S3(bucket=bucket, log_level=log_level) try: logger.info("Getting all the output prefixes from workflow output") documents = s3.list_prefixes(prefix=f"public/output/{workflow_id}") logger.info("Copying PHI entity outputs and original documents to workflow output prefix") file_list = s3.list_objects(prefix=phi_output_dir, filters=["ComprehendMedicalS3WriteTestFile", "Manifest"]) for file in file_list: fragments = file.split('/')[-2:] phi_output = os.path.basename(file).split('.')[0]+".comp-med" """ Amazon Comprehend Medical outputs will have a naming convention of .txt.out where is the name of the original document file such as my_doc.pdf. Customize the code section below appropriately to parse out the document name in case you update the naming convention of the txt file generated by Amazon Textract in the lambda function idp-process-textract-output.py """ document_name = os.path.basename(file).replace('.txt.out','') workflow_output = f"public/output/{workflow_id}/{fragments[0]}" # Move the PHI Output file s3.move_object(source_object=file, destination_object=f"{workflow_output}/{phi_output}") #Move the original document s3.move_object(source_object=f"public/input/{workflow_id}/{document_name}", destination_object=f"{workflow_output}/orig-doc/{document_name}") logger.info("Copying PHI entity Manifest file to target workflow prefix") manifest_file = s3.list_objects(prefix=phi_output_dir, filters=["/failed/","/success/"], search=["Manifest"])[0] s3.move_object(source_object=manifest_file, destination_object=f"public/output/{workflow_id}/Manifest") logger.debug(f"Getting retain_orig_docs status from database") stmt = f"SELECT \"retain_orig_docs\" FROM \"{env_vars['IDP_TABLE']}\" WHERE part_key=? AND sort_key=?" logger.debug(stmt) ddb_response = ddb.execute_statement(Statement=stmt, Parameters=[ {'S': workflow_id}, {'S': f"input/{workflow_id}/"} ]) logger.debug(ddb_response) deserialized_document = {k: deserializer.deserialize(v) for k, v in ddb_response['Items'][0].items()} logger.debug(deserialized_document) retain_docs = deserialized_document['retain_orig_docs'] logger.debug(retain_docs) map_list = gen_list_for_map(documents=documents) logger.debug(map_list) if map_list: return dict(workflow_id=workflow_id, input_prefix= f"input/{workflow_id}/",bucket=bucket, retain_docs=retain_docs, doc_list=map_list) else: update_error_state(env_vars=env_vars,event=event) return dict(error="Error occured while copying PHI output file. map_list is None") except Exception as e: logger.error("Error occured...") logger.error(e) update_error_state(env_vars=env_vars,event=event) return dict(error="Error occured while copying PHI output file")