import boto3 import json import requests from elasticsearch import Elasticsearch, RequestsHttpConnection from requests_aws4auth import AWS4Auth import os from dateutil import parser # Creates a mapping (Schema) for the ES Index indexDoc = { "mappings" : { "properties" : { "EVENT_DATE" : { "type" : "date" }, "SN_WAR" : { "type" : "keyword" }, "API_WELL_NUMBER" : { "type" : "keyword" }, "DEPTH" : { "type" : "long" }, "LOCATION" : { "type" : "geo_point" }, "DAILY_REMARK" : { "type" : "text" }, "EVENT_TYPE" : { "type" : "text" }, "EVENT_TEXT" : { "type" : "text" }, "EVENT_SCORE" : { "type" : "text" } } }, "settings" : { "number_of_shards": 1, "number_of_replicas": 0 } } # Creates an ES Index if one doesn't already exist def createIndex(esClient): try: print("Creating Index") res = esClient.indices.exists('events') if res is False: esClient.indices.create('events', body=indexDoc) print ('Created') return 1 except Exception as E: print("Unable to Create Index {0}".format("events")) print(E) exit(4) # Attempts to send the document to ES def indexDocElement(esClient,response): try: print("Indexing Document") rem_date = parser.parse(response['remediation_date']) response['remediation_date'] = rem_date.strftime("%Y-%m-%d") if response['event_date'] == 'nan': response['event_date'] = response['remediation_date'] else: event_date = parser.parse(response['event_date']) response['event_date'] = rem_date.strftime("%Y-%m-%d") print("New Event Date: " + response['event_date']) print (response['api_well_number']) snWar = response['sn_war'] apiWellNumber = response['api_well_number'] depth = response['depth'] location = response['location'] dailyRemark = response['daily_remark'] eventType = response['event_type'] retval = esClient.index(index='events', body={ 'EVENT_DATE': response['event_date'], 'SN_WAR': snWar, 'API_WELL_NUMBER': apiWellNumber['S'], 'DEPTH': depth, 'LOCATION': location, 'DAILY_REMARK': dailyRemark, 'EVENT_TYPE': response['event_type'], 'EVENT_TEXT': response['event_keywords'], 'EVENT_SCORE': response['event_score'] }) print(retval) except Exception as E: print("Document not indexed") print("Error: ",E) exit(5) def lambda_handler(event, context): es_endpoint = os.environ['es_endpoint'] region = os.environ['Region'] service = 'es' credentials = boto3.Session().get_credentials() awsauth = AWS4Auth(credentials.access_key, credentials.secret_key, region, service, session_token=credentials.token) try: esClient = Elasticsearch( hosts=es_endpoint, port=443, http_auth=awsauth, use_ssl=True, verify_certs=True, connection_class=RequestsHttpConnection) print (esClient.info()) except Exception as E: print("Unable to connect to {0}".format(es_endpoint)) print(E) exit(3) createIndex(esClient) events = json.loads(event['Records'][0]['body']) print('Events: ', events) try: indexDocElement(esClient,events) except Exception as e: print(e) print('Error inserting into Elasticsearch. Please check Cloudwatch Logs.') raise e