#!/usr/bin/env python """ get_scan_findings.py: Lambda function that retrieves AWS Inspector scan findings for an EC2 instance as part of the AMI Lifecycle VULNERABILITY_SCANS State Machine. """ import datetime import json import logging import os import traceback import boto3 from ..services.constants_service import ConstantsService from ..services.error_notifier_service import ErrorNotifierService # environment variables S3_BUCKET = os.environ['S3_BUCKET'] KMS_KEY_ARN = os.environ['KMS_KEY_ARN'] # constants OPERATOR = "VULNERABILITY_SCAN_GET_FINDINGS" TEMPLATE_FILE = "state_machine_error.template" SEVERITIES = ['INFORMATIONAL','LOW','MEDIUM','HIGH','CRITICAL','UNTRIAGED'] # boto 3 inspector2_client = boto3.client('inspector2') # services error_notifier_service = ErrorNotifierService() constants_service = ConstantsService() def number_of_severity_level_in_findings(severity: str, finding_report: list) -> int: ''' Return the number of findings with a specific severity level in the response ''' return len([finding for finding in finding_report if finding['severity'] == severity]) def get_inspector_findings(instance_id: str) -> dict: ''' Return all inspector findings for a specific EC2 Instance ID ''' findings = [] # Create a reusable Paginator paginator = inspector2_client.get_paginator('list_findings') # Create a PageIterator from the Paginator finding_iterator = paginator.paginate( filterCriteria={ 'resourceId': [ { 'comparison': 'EQUALS', 'value': instance_id }, ], } ) for finding_result in finding_iterator: findings = findings + finding_result['findings'] return findings def create_report(instance_id: str) -> dict: ''' Create an Inspector V2 report for a specific EC2 Instance ''' report = inspector2_client.create_findings_report( filterCriteria={ 'resourceId': [ { 'comparison': 'EQUALS', 'value': instance_id }, ], }, reportFormat='JSON', s3Destination={ 'bucketName': S3_BUCKET, 'kmsKeyArn': KMS_KEY_ARN } ) return { 'S3Bucket': S3_BUCKET, 'S3Key': report['reportId'] } def lambda_handler(event,context): # set logging logger = logging.getLogger() logger.setLevel(logging.DEBUG) # print the event details logger.debug(json.dumps(event, indent=2)) try: # get details from previous stage vulnerability_scan_instance_id = event['vulnerability_scans_operation']['output']["vulnerability_scan_instance_id"] findings_by_severity = {} findings = get_inspector_findings(vulnerability_scan_instance_id) report_details = create_report(vulnerability_scan_instance_id) if len(findings) > 0: contains_vulnerabilities = True number_of_findings = len(findings) logger.info(f"Instance contains {str(number_of_findings)} Vulnerabilities") for severity_level in SEVERITIES: findings_by_severity[severity_level] = str(number_of_severity_level_in_findings(severity_level, findings)) else: contains_vulnerabilities = False number_of_findings = 0 logger.info("No Vulnerabilities found in EC2 Instance") for severity_level in SEVERITIES: findings_by_severity[severity_level] = "0" # set task outputs event['vulnerability_scans_operation']['output']['ami_contains_vulnerabilities'] = contains_vulnerabilities event['vulnerability_scans_operation']['output']['total_vulnerabilities'] = str(number_of_findings) event['vulnerability_scans_operation']['output']['vulnerability_severities'] = findings_by_severity event['vulnerability_scans_operation']['output']['vulnerability_scan_report'] = report_details event['vulnerability_scans_operation']['output']['status'] = constants_service.STATUS_COMPLETED event['vulnerability_scans_operation']['output']['hasError'] = False return { 'statusCode': 200, 'body': event, 'headers': {'Content-Type': 'application/json'} } except Exception as e: traceback.print_exception(type(e), value=e, tb=e.__traceback__) stack_trace = traceback.format_exc() logger.error(f'Error in executing {OPERATOR} operation: {str(e)}') event['vulnerability_scans_operation']['output']['status'] = constants_service.STATUS_ERROR event['vulnerability_scans_operation']['output']['hasError'] = True event['vulnerability_scans_operation']['output']['errorMessage'] = str(e) # create error payload to send to the api error_payload = {} error_payload['name'] = constants_service.EVENT_BUILD_AMI error_payload['status'] = constants_service.STATUS_ERROR error_payload['status_date'] = datetime.datetime.now().strftime("%m/%d/%Y, %H:%M:%S") stack_tag = event['vulnerability_scans_operation']['input']['cfn_stack_name'] lifecycle_id = event['vulnerability_scans_operation']['input']['lifecycle_id'] properties = { 'task': OPERATOR, "error": str(e), "stack_trace": stack_trace, "stack_tag": stack_tag, "lifecycle_id": lifecycle_id } error_payload['properties'] = properties subject = f"ERROR in {OPERATOR} state machine event for {stack_tag}" try: error_notifier_service.send_notification( subject=subject, template_name=TEMPLATE_FILE, template_attributes=error_payload, error_message=str(e), stack_trace=stack_trace ) except Exception as err: logger.error(f"An error occurred attempting to send error notification: {str(err)}") return { 'statusCode': 500, 'body': event, 'headers': {'Content-Type': 'application/json'} }