import json import boto3 import datetime import random import time import os import urllib3 s3client = boto3.client('s3') http = urllib3.PoolManager() def gen_surl(bucketname, keyname): url = s3client.generate_presigned_url(ClientMethod='get_object', Params={ 'Bucket': bucketname, 'Key': keyname}) return url def b_putpriv(bucket, key, ContentType,body): srep = s3client.put_object( ACL='private', Body=body, Bucket=bucket, Key=key, ContentType=ContentType,) #print(srep) return srep def b_get_obj(bucket, bkey): object = s3client.get_object(Bucket=bucket, Key=bkey) return(object['Body'].read().decode('utf-8')) def httpget(ep): _ret = {} _headers = {"Content-Type":"application/json"} _res = http.request('GET',ep) _ret = _res.data.decode('utf-8') return _ret def loadfiles(filelist): _ret = [] _headers = {'Content-type': 'application/json'} #_devent = event['curlcmd'][0] for _file in filelist: _ep = _file['endpoint'] _out = f"loading {_file['data']['source']}" print(_out) _data = json.dumps(_file['data']) r = http.request('POST',_ep,headers=_headers,body=_data) _ret.append(json.loads(r.data.decode('utf-8'))) return _ret def process_files(event): current_date = str(datetime.date.today()) _time = current_date.split('-') print(_time) __year = f"{_time[0]}-" __month= f"{_time[1]}-" __day = f"{_time[2]}/" bucket = os.environ['NeptuneBucket'] region = os.environ['AWS_REGION'] endpoint = os.environ['NEPTUNE_CLUSTER_ENDPOINT'] NeptuneClusterRole =os.environ['NeptuneClusterRole'] _fprefix=f'filestolaod/{__year}{__month}{__day}' s3list= s3getlist(bucket,_fprefix) _nepload =[] loadresults={} for file in s3list: lfile = {'endpoint':'https://'+ endpoint +':8182/loader', 'data':{'source' : 's3://'+bucket+'/'+file, 'iamRoleArn' : NeptuneClusterRole, 'mode': 'AUTO', 'format' : 'csv', 'region' : region, 'failOnError' : 'FALSE', 'parallelism' : 'MEDIUM', 'updateSingleCardinalityProperties' : 'FALSE', 'queueRequest' : 'TRUE'}} _nepload.append(lfile) if len(_nepload) > 1: #loadresults = loadfiles(_nepload) print(json.dumps(_nepload)) return loadresults def process_files_event(event): current_date = str(datetime.date.today()) _time = current_date.split('-') #wait for s3 to sync time.sleep(5) __year = f"{_time[0]}-" __month= f"{_time[1]}-" __day = f"{_time[2]}/" bucket = os.environ['NeptuneBucket'] file = event['Records'][0]['s3']['object']['key'] region = os.environ['AWS_REGION'] endpoint = os.environ['NEPTUNE_CLUSTER_ENDPOINT'] NeptuneClusterRole =os.environ['NeptuneClusterRole'] _fprefix=f'filestolaod/{__year}{__month}{__day}' #s3list= s3getlist(bucket,_fprefix) _nepload =[] loadresults={} lfile = {'endpoint':'https://'+ endpoint +':8182/loader', 'data':{'source' : 's3://'+bucket+'/'+file, 'iamRoleArn' : NeptuneClusterRole, 'mode': 'AUTO', 'format' : 'csv', 'region' : region, 'failOnError' : 'FALSE', 'parallelism' : 'MEDIUM', 'updateSingleCardinalityProperties' : 'FALSE', 'queueRequest' : 'TRUE'}} _nepload.append(lfile) if len(_nepload) > 0: loadresults = loadfiles(_nepload) print(json.dumps(loadresults)) return loadresults def s3getlist(bucket,Prefix): s3_client = boto3.client('s3') res = s3_client.list_objects_v2(Bucket=bucket, Prefix=Prefix) keey=[] if 'Contents' in res: for ckey in res['Contents']: if ckey['Key'].find('metadata') > -1: d= 0 else: keey.append(ckey['Key']) return keey def neptunedbreset(event): _ret = [] endpoint = os.environ['NEPTUNE_CLUSTER_ENDPOINT'] _ep='https://'+ endpoint +':8182/system' _datagtok = {'action':'initiateDatabaseReset'} _headers = {'Content-type': 'application/json'} _data = json.dumps(_datagtok) _results = http.request('POST',_ep,headers=_headers,body=_data) _jsonresults= json.loads(_results.data.decode('utf-8')) _token = _jsonresults['payload']['token'] _dataperform = {'action':'performDatabaseReset','token':_token} #performing neptune reset _data = json.dumps(_dataperform) _resultsp = http.request('POST',_ep,headers=_headers,body=_data) _jsonresultsp= json.loads(_resultsp.data.decode('utf-8')) _res = {'endpoint':_ep,'datarequestoken':_datagtok,'dataperform':_dataperform,'resetresults':_jsonresultsp} print(json.dumps(_res)) return _res def process_files_event_2(file): bucket = os.environ['NeptuneBucket'] region = os.environ['AWS_REGION'] endpoint = os.environ['NEPTUNE_CLUSTER_ENDPOINT'] NeptuneClusterRole =os.environ['NeptuneClusterRole'] _nepload =[] loadresults={} lfile = {'endpoint':'https://'+ endpoint +':8182/loader', 'data':{'source' : 's3://'+bucket+'/'+file, 'iamRoleArn' : NeptuneClusterRole, 'mode': 'AUTO', 'format' : 'csv', 'region' : region, 'failOnError' : 'FALSE', 'parallelism' : 'MEDIUM', 'updateSingleCardinalityProperties' : 'FALSE', 'queueRequest' : 'TRUE'}} _nepload.append(lfile) if len(_nepload) > 0: loadresults = loadfiles(_nepload) return loadresults def prep_files(event): _ret = 'working..' file = event['Records'][0]['s3']['object']['key'] bucket = event['Records'][0]['s3']['bucket']['name'] prefix_a = event['Records'][0]['s3']['object']['key'].split('/') prefix = file.replace(prefix_a[4],'') if file.find('Attributegroup_attribute_Attributegroup_edges.csv') > -1: #found the final file #wait for 4 secs for all files to be saved time.sleep(4) _ret = f'Received final file {file}' filestoload=["Application.csv","Attgroup.csv","Application_Attgroup_Application_edges.csv","Attributes.csv","Attributegroup_attribute_Attributegroup_edges.csv"] for _file in filestoload: _loadfile = f'{prefix}{_file}' _ret = process_files_event_2(_loadfile) else: _ret = f'Received {file}' return _ret def lambda_handler(event, context): # TODO implement print(json.dumps(event)) #return neptunedbreset('event') #_ret = process_files_event(event) _ret = prep_files(event) return { 'statusCode': 200, 'body': 'helloWorld', 'ret':_ret }