#!/usr/bin/env python """ poll_scan_status.py: Lambda function that checks the status of AWS Inspector scan findings for an EC2 instance for the AMI Lifecycle VULNERABILITY_SCANS State Machine. """ import datetime import json import logging import traceback import boto3 from ..services.constants_service import ConstantsService from ..services.error_notifier_service import ErrorNotifierService # constants OPERATOR = "VULNERABILITY_SCAN_POLL_SCAN_STATUS" TEMPLATE_FILE = "state_machine_error.template" # boto 3 inspector2_client = boto3.client('inspector2') # services error_notifier_service = ErrorNotifierService() constants_service = ConstantsService() def lambda_handler(event: dict, context: dict) -> dict: # set logging logger = logging.getLogger() logger.setLevel(logging.DEBUG) # print the event details logger.debug(json.dumps(event, indent=2)) try: # get details from previous stage vulnerability_scan_instance_id = event['vulnerability_scans_operation']['output']["vulnerability_scan_instance_id"] response = inspector2_client.list_coverage( filterCriteria={ 'resourceId': [ { 'comparison': 'EQUALS', 'value': vulnerability_scan_instance_id }, ], } ) scan_status_code = "NOT_AVAILABLE" scan_status_reason = "NOT_AVAILABLE" if len(response["coveredResources"]) > 0: scan_status_code = response["coveredResources"][0]['scanStatus']['statusCode'] scan_status_reason = response["coveredResources"][0]['scanStatus']['reason'] logger.info( f"EC2 Instance: {vulnerability_scan_instance_id}, " + f"AWS Inspector V2 status code: {scan_status_code}, " + f"AWS Inspector V2 status reason: {scan_status_reason}" ) # set task outputs event['vulnerability_scans_operation']['output']['scan_status_code'] = str(scan_status_code).upper() event['vulnerability_scans_operation']['output']['scan_status_reason'] = str(scan_status_reason).upper() event['vulnerability_scans_operation']['output']['status'] = constants_service.STATUS_COMPLETED event['vulnerability_scans_operation']['output']['hasError'] = False return { 'statusCode': 200, 'body': event, 'headers': {'Content-Type': 'application/json'} } except Exception as e: traceback.print_exception(type(e), value=e, tb=e.__traceback__) stack_trace = traceback.format_exc() logger.error(f'Error in executing {OPERATOR} operation: {str(e)}') event['vulnerability_scans_operation']['output']['status'] = constants_service.STATUS_ERROR event['vulnerability_scans_operation']['output']['hasError'] = True event['vulnerability_scans_operation']['output']['errorMessage'] = str(e) # create error payload to send to the api error_payload = {} error_payload['name'] = constants_service.EVENT_BUILD_AMI error_payload['status'] = constants_service.STATUS_ERROR error_payload['status_date'] = datetime.datetime.now().strftime("%m/%d/%Y, %H:%M:%S") stack_tag = event['vulnerability_scans_operation']['input']['cfn_stack_name'] lifecycle_id = event['vulnerability_scans_operation']['input']['lifecycle_id'] properties = { 'task': OPERATOR, "error": str(e), "stack_trace": stack_trace, "stack_tag": stack_tag, "lifecycle_id": lifecycle_id } error_payload['properties'] = properties subject = f"ERROR in {OPERATOR} state machine event for {stack_tag}" try: error_notifier_service.send_notification( subject=subject, template_name=TEMPLATE_FILE, template_attributes=error_payload, error_message=str(e), stack_trace=stack_trace ) except Exception as err: logger.error(f"An error occurred attempting to send error notification: {str(err)}") return { 'statusCode': 500, 'body': event, 'headers': {'Content-Type': 'application/json'} }