# Copyright 2022 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Licensed under the Apache License, Version 2.0 (the "License"). # You may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import os import json from datetime import datetime import logging import uuid from urllib.parse import unquote_plus import boto3 from boto3.dynamodb.conditions import Key, Attr from botocore.exceptions import ClientError logger = logging.getLogger() logger.setLevel(logging.INFO) sqs = boto3.resource('sqs') ssm=boto3.client('ssm') dynamodb = boto3.resource("dynamodb") dataset_table = dynamodb.Table('octagon-Datasets-{}'.format(os.environ['ENV'])) catalog_table = dynamodb.Table(f"octagon-ObjectMetadata-{os.environ['ENV']}") prefix = os.environ["PREFIX"] def parse_s3_event(s3_event): return { 'bucket': s3_event["detail"]["requestParameters"]["bucketName"], 'key': s3_event["detail"]["requestParameters"]["key"], 'timestamp': int(round(datetime.utcnow().timestamp()*1000, 0)), 'last_modified_date': s3_event['time'].split('.')[0]+'+00:00' } def get_item(table, team, dataset): try: response = table.get_item( Key={ 'name': '{}-{}'.format(team, dataset) } ) except ClientError as e: print(e.response['Error']['Message']) else: item = response['Item'] return item['pipeline'] def delete_item(table, key): try: response = table.delete_item( Key=key ) except ClientError as e: logger.error('Fatal error', exc_info=True) raise e else: return response def put_item(table, item, key): try: response = table.put_item( Item=item, ConditionExpression=f"attribute_not_exists({key})", ) except ClientError as e: if e.response['Error']['Code'] == "ConditionalCheckFailedException": logger.info(e.response['Error']['Message']) else: raise else: return response def catalog_item(s3_event, message): try: operation = s3_event["detail"]["eventName"] if operation == 'DeleteObject': id = 's3://{}/{}'.format( message['bucket'], unquote_plus(message['key']) ) delete_item(catalog_table, {'id': id}) else: message['id'] = f"s3://{message['bucket']}/{message['key']}" message['stage'] = message['bucket'].split('-')[-1] if message['stage'] not in ['raw', 'stage', 'analytics']: message['stage'] = 'raw' put_item(catalog_table, message, 'id') except ClientError as e: print(e.response['Error']['Message']) logger.info(e.response['Error']['Message']) else: return message def lambda_handler(event, context): try: logger.info(f"Event: {event}, context: {context}") logger.info('Parsing S3 Event') message = parse_s3_event(event) message = catalog_item(event, message) if message['stage'] == 'raw': team = message['key'].split('/')[0] dataset = message['key'].split('/')[1] logger.info('team: {}; dataset: {}; bucket: {}; key: {}'.format(team, dataset, message['bucket'], message['key'])) try: pipeline = get_item(dataset_table, team, dataset) except: logger.info('exception thrown') logger.info('checking if ingestion is from outside data lake...') customer_config = ssm.get_parameter( Name="/AMC/DynamoDB/DataLake/CustomerConfig", WithDecryption=True ).get('Parameter').get('Value') config_table = dynamodb.Table(customer_config) response = config_table.query( IndexName = 'amc-index', Select = 'ALL_PROJECTED_ATTRIBUTES', KeyConditionExpression=Key('hash_key').eq(message['bucket']) ) dataset=response['Items'][0]['dataset'] team = response['Items'][0]['team'] pipeline = get_item(dataset_table, team, dataset) message['team'] = team message['dataset'] = dataset message['pipeline'] = pipeline message['org'] = os.environ['ORG'] message['app'] = os.environ['APP'] message['env'] = os.environ['ENV'] message['pipeline_stage'] = 'StageA' logger.info( 'Sending event to {}-{} pipeline queue for processing'.format(team, pipeline)) queue = sqs.get_queue_by_name(QueueName='{}-{}-{}-queue-a.fifo'.format( prefix, team, pipeline )) queue.send_message(MessageBody=json.dumps( message), MessageGroupId='{}-{}'.format(team, dataset), MessageDeduplicationId=str(uuid.uuid1())) except Exception as e: logger.error("Fatal error", exc_info=True) raise e return