AWSTemplateFormatVersion: '2010-09-09' Description: 'Simple Textract and Lambda demo. Incoming email messages are received by SES, persisted to S3 and any PDF, PNG or JPG attachments have their text content extracted.' Resources: TextractDocumentOutput: Type: 'AWS::S3::Bucket' DeletionPolicy: Retain DependsOn: - PermissionForTextract2DDBUpdateToInvokeLambda Properties: BucketName: !Sub textract-demo-output-${AWS::AccountId} BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 AccessControl: BucketOwnerFullControl LoggingConfiguration: DestinationBucketName: Fn::ImportValue: !Sub ${IncomingEmailStack}-S3BucketLogs LogFilePrefix: !Sub '/logs/textract-demo-docoutput-${AWS::Region}-${AWS::AccountId}/' PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true RestrictPublicBuckets: true NotificationConfiguration: LambdaConfigurations: - Event: s3:ObjectCreated:* Filter: S3Key: Rules: - Name: suffix Value: .txt Function: !GetAtt Textract2DDBUpdate.Arn Tags: - Key: Purpose Value: Extracted Ouput TextractDocumentOutputBucketPolicy: Type: AWS::S3::BucketPolicy DependsOn: - TextractDocumentOutput Properties: Bucket: !Ref TextractDocumentOutput PolicyDocument: Id: MyPolicy Version: 2012-10-17 Statement: - Sid: AllowWrite Effect: Allow Principal: Service: - "lambda.amazonaws.com" Action: 's3:PutObject*' Resource: !Sub 'arn:aws:s3:::${TextractDocumentOutput}/*' AttachmentsS3Bucket: Type: AWS::S3::Bucket DeletionPolicy: Retain Properties: BucketName: !Sub textract-demo-attachments-${AWS::AccountId} BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 AccessControl: BucketOwnerFullControl LoggingConfiguration: DestinationBucketName: Fn::ImportValue: !Sub ${IncomingEmailStack}-S3BucketLogs LogFilePrefix: !Sub '/logs/textract-demo-attachments-${AWS::Region}-${AWS::AccountId}/' PublicAccessBlockConfiguration: BlockPublicAcls: true BlockPublicPolicy: true IgnorePublicAcls: true Tags: - Key: Purpose Value: Email Attachments AttachmentsBucketPolicy: Type: AWS::S3::BucketPolicy DependsOn: - AttachmentsS3Bucket Properties: Bucket: !Ref AttachmentsS3Bucket PolicyDocument: Id: MyPolicy Version: 2012-10-17 Statement: - Sid: AllowWrite Effect: Allow Principal: Service: - "lambda.amazonaws.com" Action: - s3:GetObject - s3:ListBucket - s3:PutObject Resource: - !Sub arn:aws:s3:::textract-demo-attachments-${AWS::AccountId} - !Sub arn:aws:s3:::textract-demo-attachments-${AWS::AccountId}/* TextractDDBRole: Type: AWS::IAM::Role Metadata: cfn_nag: rules_to_suppress: - id: W11 reason: "No resource level permissions for ec2.DescribeNetworkInterfaces" - id: W58 reason: "Lambda functions has permissions to write cloudwatch logs" DependsOn: - TableValuesDynamoTable - KeyValuesDynamoTable Properties: #RoleName: # !Sub ${AWS::StackName}-lambda-textract-DDBRole AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: "TextracttoDBDataLambdaFunctionAccess-CWAccessPolicy" PolicyDocument: Statement: [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/textract-demo-functions-DDBData:*" ] } ] Version: '2012-10-17' - PolicyName: "TextracttoDBDataLambdaFunctionAccess" PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: dynamodb:PutItem Resource: - !GetAtt - TableValuesDynamoTable - Arn - !GetAtt - KeyValuesDynamoTable - Arn - Effect: Allow Action: - ec2:DescribeNetworkInterfaces - ec2:CreateNetworkInterface - ec2:DeleteNetworkInterface - ec2:DescribeInstances - ec2:AttachNetworkInterface Resource: "*" - Effect: Allow Action: - kms:generatedatakey - kms:decrypt Resource: Fn::ImportValue: KeyArn - Effect: Allow Action: - s3:GetObject - s3:ListBucket Resource: - !Sub arn:aws:s3:::textract-demo-output-${AWS::AccountId} - !Sub arn:aws:s3:::textract-demo-output-${AWS::AccountId}/* TextractServiceRole: Type: AWS::IAM::Role Metadata: cfn_nag: rules_to_suppress: - id: W11 reason: "No resource level permissions for ec2.DescribeNetworkInterfaces" - id: W58 reason: "Lambda functions has permissions to write cloudwatch logs" Properties: #RoleName: # !Sub ${AWS::StackName}-textract-service AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - textract.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: "TextractServiceRole-CWAccessPolicy" PolicyDocument: Statement: [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/Textract-service-Functions:*" ] } ] Version: '2012-10-17' - PolicyName: "TextractServiceRoleLambdaFunctionAccess" PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: sns:Publish Resource: Ref: TextractCallbackSNSTopic - Effect: Allow Action: - kms:generatedatakey - kms:decrypt Resource: Fn::ImportValue: KeyArn - Effect: Allow Action: - ec2:DescribeNetworkInterfaces - ec2:CreateNetworkInterface - ec2:DeleteNetworkInterface - ec2:DescribeInstances - ec2:AttachNetworkInterface Resource: "*" TextractCallbackRole: Type: AWS::IAM::Role Metadata: cfn_nag: rules_to_suppress: - id: W11 reason: "No resource level permissions for ec2.DescribeNetworkInterfaces" - id: W58 reason: "Lambda functions has permissions to write cloudwatch logs" Properties: #RoleName: # !Sub ${AWS::StackName}-lambda-textract-callback AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: "TextractCallbackRole-CWAccessPolicy" PolicyDocument: Statement: [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/Textract-service-Functions:*" ] } ] Version: '2012-10-17' - PolicyName: "TextractCallbackRoleLambdaFunctionAccess" PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: dynamodb:PutItem Resource: !GetAtt - RawdataDynamoTable - Arn - Effect: Allow Action: - kms:generatedatakey - kms:decrypt Resource: Fn::ImportValue: KeyArn - Effect: Allow Action: - ec2:DescribeNetworkInterfaces - ec2:CreateNetworkInterface - ec2:DeleteNetworkInterface - ec2:DescribeInstances - ec2:AttachNetworkInterface Resource: "*" - Effect: Allow Action: textract:GetDocumentAnalysis Resource: '*' - Effect: Allow Action: - s3:GetObject - s3:ListBucket - s3:PutObject Resource: - !Sub arn:aws:s3:::textract-demo-output-${AWS::AccountId} - !Sub arn:aws:s3:::textract-demo-output-${AWS::AccountId}/* EmailParsingRole: Type: AWS::IAM::Role Metadata: cfn_nag: rules_to_suppress: - id: W11 reason: "No resource level permissions for ec2.DescribeNetworkInterfaces" Properties: #RoleName: # !Sub ${AWS::StackName}-lambda-email-parsing AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: EmailParsingLambdaFunctionAccess PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: dynamodb:PutItem Resource: !GetAtt - EmailsDynamoTable - Arn - Effect: Allow Action: textract:StartDocumentAnalysis Resource: '*' - Effect: Allow Action: - kms:generatedatakey - kms:decrypt Resource: Fn::ImportValue: KeyArn - Effect: Allow Action: - s3:GetObject - s3:ListBucket - s3:PutObject Resource: - !Sub arn:aws:s3:::textract-demo-attachments-${AWS::AccountId} - !Sub arn:aws:s3:::textract-demo-attachments-${AWS::AccountId}/* - Effect: Allow Action: - ec2:DescribeNetworkInterfaces - ec2:CreateNetworkInterface - ec2:DeleteNetworkInterface - ec2:DescribeInstances - ec2:AttachNetworkInterface Resource: "*" - Effect: Allow Action: s3:GetObject Resource: !Sub - ${arn}/* - arn: Fn::ImportValue: !Sub ${IncomingEmailStack}-RawEmailsS3BucketARN - Effect: Allow Action: logs:CreateLogGroup Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:* - Effect: Allow Action: - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/textract-demo-functions-EmailParsing:* PermissionForIncomingEmailSNSTopicToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt EmailParsing.Arn Action: lambda:InvokeFunction Principal: sns.amazonaws.com SourceAccount: !Ref AWS::AccountId SourceArn: Fn::ImportValue: !Sub ${IncomingEmailStack}-IncomingEmailSNSTopic EmailParsing: DependsOn: - EmailsDynamoTable Type: AWS::Lambda::Function Metadata: cfn_nag: rules_to_suppress: - id: W92 reason: "Customer will enable reservedconcurrentlimit based on their use case" - id: W58 reason: "Lambda functions has permissions to write cloudwatch logs" Properties: FunctionName: textract-demo-functions-EmailParsing Handler: index.lambda_handler Timeout: 900 MemorySize: 1024 Runtime: python3.9 Role: !GetAtt - EmailParsingRole - Arn Code: ZipFile: | import email import json import os from email import policy import re from datetime import datetime import boto3 EMAILS_DYNAMO_TABLE = os.environ['EMAILS_DYNAMO_TABLE'] ATTACHMENTS_BUCKET = os.environ['ATTACHMENTS_BUCKET'] TEXTRACT_NOTIFICATION_TOPIC_ARN = \ os.environ['TEXTRACT_NOTIFICATION_TOPIC_ARN'] TEXTRACT_NOTIFICATION_ROLE_ARN = \ os.environ['TEXTRACT_NOTIFICATION_ROLE_ARN'] def lambda_handler(sns_payload, context): s3 = boto3.client('s3') textract = boto3.client('textract') dynamodb = boto3.client('dynamodb') if 'Records' not in sns_payload: raise Exception('No Records section') if len(sns_payload['Records']) != 1: raise Exception('Expected only 1 record') sns_message = sns_payload['Records'][0]['Sns']['Message'] event = json.loads(sns_message) if 'action' not in event['receipt']: raise Exception('Invalid event, expected action section') sourceemail = event['mail']['source'] if '=' in sourceemail: sourceemail = sourceemail.split('=') sourceemail = sourceemail[2] documentnamelist = '' action = event['receipt']['action'] actionType = action['type'] if actionType != 'S3': raise Exception('Expected action type to be S3, got: ' + actionType) bucket_name = action['bucketName'] object_key = action['objectKey'] email_id = object_key s3_raw_email = s3.get_object(Bucket=bucket_name, Key=object_key) raw_email_str = s3_raw_email['Body'].read().decode('utf-8') raw_email = \ email.parser.Parser(policy=policy.strict).parsestr(raw_email_str) source = 'Email from ' + sourceemail now = datetime.now() dt_string = now.strftime('%d/%m/%Y %H:%M:%S') attachment_index = 0 attachments = [] for part in raw_email.walk(): if part.is_attachment(): if len(part.get_filename()): attachment_key = object_key + '/attachments/' \ + re.sub(r'[^a-zA-Z0-9]', '', part.get_filename()) job_tag = email_id[0:5] + '_' + re.sub(r'[^a-zA-Z0-9]', '', part.get_filename()) attachment_index += 1 else: attachment_id = str(attachment_index) attachment_key = object_key + '/attachments/' \ + attachment_id job_tag = email_id[0:5] + '_' + attachment_id attachment_index += 1 s3.put_object(Bucket=ATTACHMENTS_BUCKET, Key=attachment_key, Body=part.get_content()) documentnamelist += part.get_filename() + ',' response = \ textract.start_document_analysis(DocumentLocation={'S3Object': {'Bucket': ATTACHMENTS_BUCKET, 'Name': attachment_key}}, FeatureTypes=['FORMS', 'TABLES'], JobTag=job_tag, NotificationChannel={'SNSTopicArn': TEXTRACT_NOTIFICATION_TOPIC_ARN, 'RoleArn': TEXTRACT_NOTIFICATION_ROLE_ARN}) attachments.append({ 'attachment_id': part.get_filename(), 'content_type': part.get_content_type(), 'key': attachment_key, 'textract_job_id': response['JobId'], }) item = { 'email_id': {'S': email_id}, 'subject': {'S': raw_email['subject']}, 'source': {'S': source}, 'documentname': {'S': documentnamelist[0:-1]}, 'timestamp': {'S': dt_string}, } dynamodb.put_item(TableName=EMAILS_DYNAMO_TABLE, Item=item, ReturnValues='NONE') response = {'email_id': email_id, 'attachments': attachments} return response Environment: Variables: EMAILS_DYNAMO_TABLE: Ref: EmailsDynamoTable ATTACHMENTS_BUCKET: Ref: AttachmentsS3Bucket TEXTRACT_NOTIFICATION_TOPIC_ARN: Ref: TextractCallbackSNSTopic TEXTRACT_NOTIFICATION_ROLE_ARN: !GetAtt - TextractServiceRole - Arn VpcConfig: SecurityGroupIds: - Ref: EmailParsingSecurityGroup SubnetIds: - Fn::ImportValue: !Sub ${ComputeEnvStack}-LambdaSubnetA - Fn::ImportValue: !Sub ${ComputeEnvStack}-LambdaSubnetB EmailParsingSecurityGroup: Type: AWS::EC2::SecurityGroup Metadata: cfn_nag: rules_to_suppress: - id: W5 reason: "Customer will enable VPC endpoints for S3 and DynamoDB and update the Egress accordingly" Properties: #GroupName: # !Sub ${AWS::StackName}-email-parsing GroupDescription: Textract callback Lambda security group VpcId: Fn::ImportValue: !Sub ${ComputeEnvStack}-VPC SecurityGroupEgress: - IpProtocol: tcp FromPort: 443 ToPort: 443 CidrIp: 0.0.0.0/0 Description: "Lambda to call S3 DynamoDB Services" Tags: - Key: Name Value: !Sub ${AWS::StackName}-email-parsing IncomingEmailSNSTopicSubscription: Type: AWS::SNS::Subscription Properties: Endpoint: !GetAtt EmailParsing.Arn Protocol: lambda TopicArn: Fn::ImportValue: !Sub ${IncomingEmailStack}-IncomingEmailSNSTopic TextractCallbackSNSTopic: Type: AWS::SNS::Topic Properties: DisplayName: !Sub ${AWS::StackName}-textract-callback TopicName: !Sub ${AWS::StackName}-textract-callback KmsMasterKeyId: Fn::ImportValue: KeyAliasName Subscription: - Protocol: lambda Endpoint: !GetAtt TextractCallback.Arn TextractCallbackSNSTopicPolicy: Type: AWS::SNS::TopicPolicy Properties: Topics: - !Ref TextractCallbackSNSTopic PolicyDocument: Id: TextractCallbackSNSTopicPolicy Version: '2012-10-17' Statement: - Sid: S3TriggerAccess Effect: Allow Principal: Service: - 'lambda.amazonaws.com' Action: - 'sns:Publish' - 'SNS:GetTopicAttributes' - 'SNS:SetTopicAttributes' - 'SNS:AddPermission' - 'sns:RemovePermission' - 'SNS:DeleteTopic' - 'SNS:Subscribe' - 'SNS:ListSubscriptionsByTopic' Resource: - !Ref TextractCallbackSNSTopic Condition: StringEquals: 'AWS:SourceAccount': !Ref 'AWS::AccountId' ArnLike: aws:SourceArn: !Ref TextractDocumentOutput PermissionForTextractCallbackToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt TextractCallback.Arn Action: lambda:InvokeFunction Principal: sns.amazonaws.com SourceAccount: !Ref AWS::AccountId SourceArn: !Ref TextractCallbackSNSTopic EmailsDynamoTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: email_id AttributeType: S - AttributeName: documentname AttributeType: S BillingMode: PAY_PER_REQUEST SSESpecification: KMSMasterKeyId: Fn::ImportValue: KeyAliasName SSEEnabled: true SSEType: KMS KeySchema: - AttributeName: email_id KeyType: HASH - AttributeName: documentname KeyType: RANGE PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true #TableName: # !Sub ${AWS::StackName}-emails-${AWS::AccountId} RawdataDynamoTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: email_id AttributeType: S - AttributeName: documentname AttributeType: S BillingMode: PAY_PER_REQUEST SSESpecification: KMSMasterKeyId: Fn::ImportValue: KeyAliasName SSEEnabled: true SSEType: KMS KeySchema: - AttributeName: email_id KeyType: HASH - AttributeName: documentname KeyType: RANGE PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true #TableName: # !Sub ${AWS::StackName}-rawdata-${AWS::AccountId} TableValuesDynamoTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: email_id AttributeType: S - AttributeName: documentname AttributeType: S BillingMode: PAY_PER_REQUEST SSESpecification: KMSMasterKeyId: Fn::ImportValue: KeyAliasName SSEEnabled: true SSEType: KMS KeySchema: - AttributeName: email_id KeyType: HASH - AttributeName: documentname KeyType: RANGE PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true #TableName: # !Sub ${AWS::StackName}-tablevaluesdata-${AWS::AccountId} KeyValuesDynamoTable: Type: AWS::DynamoDB::Table Properties: AttributeDefinitions: - AttributeName: email_id AttributeType: S - AttributeName: documentname AttributeType: S BillingMode: PAY_PER_REQUEST SSESpecification: KMSMasterKeyId: Fn::ImportValue: KeyAliasName SSEEnabled: true SSEType: KMS KeySchema: - AttributeName: email_id KeyType: HASH - AttributeName: documentname KeyType: RANGE PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true #TableName: # !Sub ${AWS::StackName}-keyvaluesdata-${AWS::AccountId} TextractCallback: DependsOn: - RawdataDynamoTable Type: AWS::Lambda::Function Metadata: cfn_nag: rules_to_suppress: - id: W92 reason: "Customer will enable reservedconcurrentlimit based on their use case" - id: W58 reason: "Lambda functions has permissions to write cloudwatch logs" Properties: FunctionName: textract-demo-functions-Callback Handler: index.lambda_handler Timeout: 900 MemorySize: 1024 Runtime: python3.9 Code: ZipFile: | import json import boto3 import os from datetime import datetime from collections import defaultdict RAWDATA_DYNAMO_TABLE = os.environ['RAWDATA_DYNAMO_TABLE'] def lambda_handler(sns_payload, context): print(json.dumps(sns_payload)) if 'Records' not in sns_payload: raise Exception('No Records section') if len(sns_payload['Records']) != 1: raise Exception('Expected only 1 record') sns_message = sns_payload['Records'][0]['Sns']['Message'] event = json.loads(sns_message) job_tag = event['JobTag'] split_job_tag = job_tag.split('_') rawtexttextract = '' documentresponse = [] blocks = [] if len(split_job_tag) != 2: raise Exception('Invalid job tag, expected [email_id]/[attachment_id], got: ' + job_tag) email_id = split_job_tag[0] attachment_id = split_job_tag[1] textract = boto3.client('textract') s3 = boto3.client('s3') output_bucket = os.environ["OUT_PUT_S3_BUCKET"] if event['API'] != 'StartDocumentAnalysis': raise Exception('Expected API to be StartDocumentAnalysis') if event['Status'] != 'SUCCEEDED': raise Exception('Content detection failed, got status: ' + event['Status']) jobId = event['DocumentLocation']['S3ObjectName'] response = textract.get_document_analysis( JobId=event['JobId'], MaxResults=1000 ) lines = [] next_token = response.get('NextToken') documentresponse.append(response) while next_token is not None: response = textract.get_document_analysis(JobId=event['JobId'], MaxResults=1000, NextToken=next_token) next_token = response.get('NextToken') documentresponse.append(response) for i in range(len(documentresponse)): ini_dict1 = documentresponse[i] blocks.append(ini_dict1['Blocks']) #json_block_content12 = json.dumps(blocks).replace('[[', '').replace(']]', '') key_map, value_map, block_map,tablecsvdata,rawtexttextract = get_kv_map(blocks) kvs = get_kv_relationship(key_map, value_map, block_map) file=jobId.split('/') if(len(kvs)): json_content = json.dumps(kvs) json_content_new = "["+ json_content +"]"; # Store Processed Key Values Data in S3 Bucket processed_textract_keyvalues_response = s3.put_object( Bucket=output_bucket, Key=jobId+"/keyvalues.txt", Body=json_content_new ) else: print("The Key Values Data is empty") if(len(tablecsvdata)): tablecsvdata = tablecsvdata.replace('}{', '},{') json_content_csv1_new = "["+ tablecsvdata +"]"; # Store Processed table Values Data in S3 Bucket processed_textract_keyvalues_response = s3.put_object( Bucket=output_bucket, Key=jobId+"/tablevalues.txt", Body=json_content_csv1_new ) else: print("The Table values data is empty") if(len(rawtexttextract)): # Store rawtext Data in S3 Bucket processed_textract_keyvalues_response = s3.put_object( Bucket=output_bucket, Key=jobId+"/rawtext.json", Body=rawtexttextract ) else: print("The rawtexttextract data is empty") dynamodb = boto3.client('dynamodb') now = datetime.now() dt_string = now.strftime("%d/%m/%Y %H:%M:%S") item = { "email_id": { "S": email_id }, "documentname": { "S": attachment_id }, "content": { "S": rawtexttextract }, "timestamp": { "S": dt_string } } dynamodb.put_item( TableName=RAWDATA_DYNAMO_TABLE, Item=item, ReturnValues='NONE' ) return { 'text': rawtexttextract } def get_kv_map(blocks): csv = '' key_map = {} value_map = {} block_map = {} blocks_map = {} table_blocks = [] rawtext = ""; for number in blocks: for block in number: block_id = block['Id'] block_map[block_id] = block if block['BlockType'] == "KEY_VALUE_SET": if 'KEY' in block['EntityTypes']: key_map[block_id] = block else: value_map[block_id] = block for number in blocks: for block in number: blocks_map[block['Id']] = block if block['BlockType'] == "TABLE": table_blocks.append(block) for number in blocks: for block in number: if block['BlockType'] == 'LINE': rawtext += block['Text']+"\n" if len(table_blocks) <= 0: print("NO Table FOUND") else: for index, table in enumerate(table_blocks): csv += generate_table_csv(table, blocks_map, index +1) return key_map, value_map, block_map, csv, rawtext def get_kv_relationship(key_map, value_map, block_map): kvs = defaultdict(list) for block_id, key_block in key_map.items(): value_block = find_value_block(key_block, value_map) key = get_text(key_block, block_map) if len(key) <= 0: key = 'No Key' val = get_text(value_block, block_map) if len(val) <= 0: val = 'No Value' #kvs[key] = val kvs[key].append(val) return kvs def find_value_block(key_block, value_map): for relationship in key_block['Relationships']: if relationship['Type'] == 'VALUE': for value_id in relationship['Ids']: value_block = value_map[value_id] return value_block def get_text(result, blocks_map): text = '' if 'Relationships' in result: for relationship in result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: #print(blocks_map) #while blocks_map[child_id] is not None: word = blocks_map[child_id] #print(word) if word['BlockType'] == 'WORD': #text += word['Text'] + ' ' text += word['Text'] + ' ' if word['BlockType'] == 'SELECTION_ELEMENT': if word['SelectionStatus'] == 'SELECTED': text += 'SELECTED' + '/' else: text += 'NOT_SELECTED' + '/' text = text.replace('NOT_SELECTED/SELECTED/', 'NOT_SELECTED/') return text def generate_table_csv(table_result, blocks_map, table_index): print("in generate_table_csv") rows = get_rows_columns_map(table_result, blocks_map) japp_json = json.dumps(rows) return japp_json def get_rows_columns_map(table_result, blocks_map): rows = {} print("in get_rows_columns_map") for relationship in table_result['Relationships']: if relationship['Type'] == 'CHILD': for child_id in relationship['Ids']: cell = blocks_map[child_id] if cell['BlockType'] == 'CELL': row_index = cell['RowIndex'] col_index = cell['ColumnIndex'] if row_index not in rows: # create new row rows[row_index] = {} # get the text value rows[row_index][col_index] = get_text(cell, blocks_map) return rows Role: !GetAtt - TextractCallbackRole - Arn Environment: Variables: RAWDATA_DYNAMO_TABLE: Ref: RawdataDynamoTable OUT_PUT_S3_BUCKET: !Sub textract-demo-output-${AWS::AccountId} VpcConfig: SecurityGroupIds: - Ref: TextractCallbackSecurityGroup SubnetIds: - Fn::ImportValue: !Sub ${ComputeEnvStack}-LambdaSubnetA - Fn::ImportValue: !Sub ${ComputeEnvStack}-LambdaSubnetB TextractCallbackSecurityGroup: Type: AWS::EC2::SecurityGroup Metadata: cfn_nag: rules_to_suppress: - id: W5 reason: "Customer will enable VPC endpoints for S3 and DynamoDB and update the Egress accordingly" Properties: #GroupName: # !Sub ${AWS::StackName}-textract-callback GroupDescription: Textract callback Lambda security group VpcId: Fn::ImportValue: !Sub ${ComputeEnvStack}-VPC SecurityGroupEgress: - IpProtocol: tcp FromPort: 443 ToPort: 443 CidrIp: 0.0.0.0/0 Description: "Lambda to call S3 DynamoDB Services" Tags: - Key: Name Value: !Sub ${AWS::StackName}-textract-callback Textract2DDBUpdate: DependsOn: - TableValuesDynamoTable - KeyValuesDynamoTable Type: AWS::Lambda::Function Metadata: cfn_nag: rules_to_suppress: - id: W92 reason: "Customer will enable reservedconcurrentlimit based on their use case" - id: W58 reason: "Lambda functions has permissions to write cloudwatch logs" Properties: FunctionName: textract-demo-functions-DDBData Handler: index.lambda_handler Timeout: 900 MemorySize: 1024 Runtime: python3.9 Code: ZipFile: | import boto3 import json import os from datetime import datetime from decimal import Decimal s3_client = boto3.client('s3') dynamodb = boto3.resource('dynamodb') dyndb = boto3.client('dynamodb', region_name='us-east-1') def lambda_handler(event, context): keyvalues_dbtable = os.environ["Keyvalues_DBTable"] tablevalues_dbtable = os.environ["Tablevalues_DBTable"] print (event) bucket = event['Records'][0]['s3']['bucket']['name'] print (bucket) json_file_name = event['Records'][0]['s3']['object']['key'] print (json_file_name) if "keyvalues" in json_file_name: table_name = keyvalues_dbtable else: table_name = tablevalues_dbtable json_object = s3_client.get_object(Bucket=bucket,Key=json_file_name) jsonFileReader = json_object['Body'].read() my_json = jsonFileReader.decode('utf8').replace("'", '') data = json.loads(my_json) s = json.dumps(data) jsonDict = json.loads(s, parse_float=Decimal) now = datetime.now() dt_string = now.strftime("%d/%m/%Y %H:%M:%S") json_file_name_email_id = json_file_name.split('/') email_id = json_file_name_email_id[0] table = dynamodb.Table(table_name) if "tablevalues" in json_file_name: for i in range(len(jsonDict)): #print(jsonDict[i]) ini_dict = jsonDict[i] tablevaluesdatanew = {} for x in ini_dict.values(): values1 = list(x.values()) if values1[0]: tablevaluesdatanew['email_id']=json_file_name_email_id[0] tablevaluesdatanew['documentname']=json_file_name_email_id[2] + "/" + values1[0] tablevaluesdatanew['Table_Values']=values1 table.put_item(Item=tablevaluesdatanew) else: for i in range(len(jsonDict)): keyvaluesdata = jsonDict[i] keyvaluesdata['email_id']=json_file_name_email_id[0] keyvaluesdata['documentname']= json_file_name_email_id[2] keyvaluesdata['timestamp']= dt_string table.put_item(Item=keyvaluesdata) Role: !GetAtt - TextractDDBRole - Arn Environment: Variables: Keyvalues_DBTable: Ref: KeyValuesDynamoTable Tablevalues_DBTable: !Ref TableValuesDynamoTable VpcConfig: SecurityGroupIds: - Ref: TextractDDBDataSecurityGroup SubnetIds: - Fn::ImportValue: !Sub ${ComputeEnvStack}-LambdaSubnetA - Fn::ImportValue: !Sub ${ComputeEnvStack}-LambdaSubnetB TextractDDBDataSecurityGroup: Type: AWS::EC2::SecurityGroup Metadata: cfn_nag: rules_to_suppress: - id: W5 reason: "Customer will enable VPC endpoints for S3 and DynamoDB and update the Egress accordingly" Properties: #GroupName: # !Sub ${AWS::StackName}-textract-DDBData GroupDescription: Textract to DDB Lambda security group VpcId: Fn::ImportValue: !Sub ${ComputeEnvStack}-VPC SecurityGroupEgress: - IpProtocol: tcp FromPort: 443 ToPort: 443 CidrIp: 0.0.0.0/0 Description: "Lambda to call S3 DynamoDB Services" Tags: - Key: Name Value: !Sub ${AWS::StackName}-textract-DDBData PermissionForTextract2DDBUpdateToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt Textract2DDBUpdate.Arn Action: lambda:InvokeFunction Principal: s3.amazonaws.com SourceAccount: !Ref AWS::AccountId SourceArn: !Sub "arn:aws:s3:::textract-demo-output-${AWS::AccountId}" Parameters: IncomingEmailStack: Type: String Description: Name of the parent stack for incoming emails ComputeEnvStack: Type: String Description: Name of the parent stack for the compute environment Outputs: EmailParsingFunction: Value: !GetAtt - EmailParsing - Arn Export: Name: !Sub "${AWS::StackName}-EmailParsingFunction" TextractCallbackFunction: Value: !GetAtt - TextractCallback - Arn Export: Name: !Sub "${AWS::StackName}-TextractCallbackFunction" TextractDocumentOutput: Description: S3 Bucket ARN where extracted content is uploaded Value: !GetAtt - TextractDocumentOutput - Arn Export: Name: !Sub "${AWS::StackName}-TextractDocumentOutput" Textract2DDBUpdateFunction: Value: !GetAtt - Textract2DDBUpdate - Arn Export: Name: !Sub "${AWS::StackName}-Textract2DDBUpdateFunction"