import json import boto3 import os import tarfile import csv from boto3.dynamodb.conditions import Key def lambda_handler(event, context): # Handles reading the post analysis Comprehend jobs and sending results to SQS # Client for S3 s3client = boto3.client('s3') # Resource for S3 s3 = boto3.resource('s3') # connect to the batch job table dynamodb = boto3.client('dynamodb') sqs = boto3.resource('sqs') queue = sqs.get_queue_by_name(QueueName=os.environ['QueueName']) # Bucket where events are coming from, post comprehend job input_bucket = event['Records'][0]['s3']['bucket']['name'] # # Path of the file that was written and triggered this lambda input_key = event['Records'][0]['s3']['object']['key'] # # Extracts the file name, sets the location of the download locally input_file_list = input_key.split('/') input_file = input_file_list[-1] input_file_path = '/tmp/' + str(input_file) s3client.download_file(input_bucket, input_key, input_file_path) os.chdir('/tmp/') input_file = 'output.tar.gz' # Open the file and unzip try: tar = tarfile.open(input_file) tar.extractall() tar.close() except: print ('Could not extract the Amazon Comprehend tar file') exit(1) print("OS File List:" + str(os.listdir('/tmp/'))) # What to call the output file to be written back to s3 # Empty list to keep track of all the events event_list = [] with open('output', 'r') as comprehend_output: line = comprehend_output.readline() cnt = 1 # Going to read through each line in the comprehend job, and find potential events. Not sure what setting original file does while line: valid_line = "[" + line + "]" data = json.loads(valid_line) for d in data: for key, value in d.items(): if key == 'File': original_file = value elif key == 'Entities' and value: # Adds just the event item to the list event_list.append(d) line = comprehend_output.readline() cnt+=1 print("Source File:" + original_file) # Original File's name and where it came from, before comprehend original_key = 'model-input/' + original_file original_bucket = input_bucket original_file_path = '/tmp/' + original_file # Downloads the original file, before comprehend did its work s3client.download_file(original_bucket, original_key, original_file_path) print("New OS File List, should have everything:" + str(os.listdir('/tmp/'))) rownum = 1 #open original file and import data into dictionary csvfile = open(original_file, errors='ignore') csvFileArray = [] for row in csv.reader(csvfile): try: csvFileArray.append(row) except: print ("Failed to print row: " + str(rownum)) rownum +=1 continue rownum +=1 print(len(csvFileArray)) print(type(csvFileArray[1])) # Going to iterate over every event that comprehend actually found print ('Number of events: ', str(len(event_list))) sent = 0 for event in event_list: # The value of the key value pair where key equals the line number the event was found on try: record = csvFileArray[event["Line"]] except: print (event) continue event_type_list = event["Entities"] #call dynamodb to get DEPTH and EVENT_DATE location try: response_depth = dynamodb.get_item( TableName=os.environ['DynamoMvWarMainProp'], Key={ 'SN_WAR': { 'S': str(record[0]) } } ) except: print ('SN WAR '+str(record[0])+' not found in MvWarMainProp table') try: response_main = dynamodb.get_item( TableName=os.environ['DynamoMvWarMain'], Key={ 'SN_WAR': { 'S': str(record[0]) } } ) except: print ('SN WAR '+str(record[0])+' not found in MvWarMain table') #call dynamodb to get API_WELL_NUMBER location try: response_locations = dynamodb.get_item( TableName='BSEE-WellLocations', Key={ 'API_WELL_NUMBER': { 'S': str(response_main['Item']['API_WELL_NUMBER']['S']), } }) except: print ('API well number either was not in MvWarMain Table or in BSEE Well Locations') response_snwar_lines = record[1] #contruct and send messaage to sqs try: body={ 'event_date': response_depth['Item']['TOTAL_DEPTH_DATE']['S'], 'remediation_date': response_depth['Item']['WELL_ACTV_START_DT']['S'], 'sn_war': record[0], 'api_well_number': response_main['Item']['API_WELL_NUMBER'] , 'depth': response_depth['Item']['DRILLING_MD']['S'], 'location': response_locations['Item']['SURF_LATITUDE']['S']+','+response_locations['Item']['SURF_LONGITUDE']['S'], 'daily_remark': response_snwar_lines, 'event_type': str(event_type_list[0]['Type']), 'event_keywords': str(event_type_list[0]['Text']), 'event_score': str(event_type_list[0]['Score']) } except: print('Could not construct SQS Body, missing critical information') try: sqs_response = queue.send_message(MessageBody=json.dumps(body)) sent +=1 except: continue print ('Number of events sent: ' + str(sent))