#!/usr/bin/env python """ launch_instance.py: Lambda function that launches an EC2 instance with an AMI Lifecycle generated AMI to be scanned by AWS Inspector as part of the AMI Lifecycle VULNERABILITY_SCANS State Machine. """ import datetime import json import logging import os import traceback import boto3 from ..services.constants_service import ConstantsService from ..services.error_notifier_service import ErrorNotifierService # environment variables EBS_VOLUME_SIZE=int(os.environ['EBS_VOLUME_SIZE']) EC2_INSTANCE_TYPE=os.environ['EC2_INSTANCE_TYPE'] EC2_INSTANCE_PROFILE_ARN=os.environ['EC2_INSTANCE_PROFILE_ARN'] VPC_ID=os.environ['VPC_ID'] SUBNET_ID=os.environ['SUBNET_ID'] SECURITY_GROUP_ID=os.environ['SECURITY_GROUP_ID'] # constants OPERATOR = "VULNERABILITY_SCAN_LAUNCH_INSTANCE" TEMPLATE_FILE = "state_machine_error.template" # boto 3 ec2_client = boto3.client('ec2') # services error_notifier_service = ErrorNotifierService() constants_service = ConstantsService() # set logging logger = logging.getLogger() logger.setLevel(logging.DEBUG) def launch_instance( ami_id: str, lifecycle_id: str, stack_tag: str ) -> None: logger.info("Launching EC2 instance for vulnerability scans") response = ec2_client.run_instances( BlockDeviceMappings=[ { 'DeviceName': '/dev/xvda', 'Ebs': { 'DeleteOnTermination': True, 'VolumeSize': EBS_VOLUME_SIZE, 'VolumeType': 'gp2' }, }, ], IamInstanceProfile={ 'Arn': EC2_INSTANCE_PROFILE_ARN }, ImageId=ami_id, InstanceType=EC2_INSTANCE_TYPE, MaxCount=1, MinCount=1, Monitoring={ 'Enabled': False }, SecurityGroupIds=[ SECURITY_GROUP_ID ], SubnetId=SUBNET_ID, TagSpecifications=[ { 'ResourceType': 'instance', 'Tags': [ { 'Key': 'Usage', 'Value': 'AMI Lifecycle Vulnerability Scans' }, { 'Key': 'Service', 'Value': 'AWS Inspector V2' }, { 'Key': 'STACK_TAG', 'Value': stack_tag }, { 'Key': 'AMI Lifecycle Id', 'Value': lifecycle_id }, { 'Key': 'AMI_LC_EVENT_INSTANCE', 'Value': 'TRUE' } ] } ] ) if 'Instances' in response: if len(response['Instances']) > 0: if 'InstanceId' in response['Instances'][0]: return response['Instances'][0]['InstanceId'] error_msg = ( "The attempt to launch an EC2 instance for vulnerability scaning " + "has not returned an Instance Id as expected. See CloudWatch logs for details." ) logger.error(error_msg) logger.error(json.dumps(response, indent=2)) raise ValueError(error_msg) def lambda_handler(event,context): # set logging logger = logging.getLogger() logger.setLevel(logging.DEBUG) # print the event details logger.debug(json.dumps(event, indent=2)) try: # get details from previous stage cfn_stack_tag = event['vulnerability_scans_operation']['input']["cfn_stack_name"] vulnerability_scan__ami_id = event['vulnerability_scans_operation']['input']["ami_id"] lifecycle_id = event['vulnerability_scans_operation']['input']["lifecycle_id"] logger.info("Executing vulnerability scans on EC2 instance") vulnerability_scan_instance_id = launch_instance( ami_id=vulnerability_scan__ami_id, lifecycle_id=lifecycle_id, stack_tag=cfn_stack_tag ) # set task outputs event['vulnerability_scans_operation']['output']["vulnerability_scan_instance_id"] = vulnerability_scan_instance_id event['vulnerability_scans_operation']['output']['status'] = constants_service.STATUS_COMPLETED event['vulnerability_scans_operation']['output']['hasError'] = False return { 'statusCode': 200, 'body': event, 'headers': {'Content-Type': 'application/json'} } except Exception as e: traceback.print_exception(type(e), value=e, tb=e.__traceback__) stack_trace = traceback.format_exc() logger.error(f'Error in executing {OPERATOR} operation: {str(e)}') event['vulnerability_scans_operation']['output']['status'] = constants_service.STATUS_ERROR event['vulnerability_scans_operation']['output']['hasError'] = True event['vulnerability_scans_operation']['output']['errorMessage'] = str(e) # create error payload to send to the api error_payload = {} error_payload['name'] = constants_service.EVENT_BUILD_AMI error_payload['status'] = constants_service.STATUS_ERROR error_payload['status_date'] = datetime.datetime.now().strftime("%m/%d/%Y, %H:%M:%S") stack_tag = event['vulnerability_scans_operation']['input']['cfn_stack_name'] lifecycle_id = event['vulnerability_scans_operation']['input']['lifecycle_id'] properties = { 'task': OPERATOR, "error": str(e), "stack_trace": stack_trace, "stack_tag": stack_tag, "lifecycle_id": lifecycle_id } error_payload['properties'] = properties subject = f"ERROR in {OPERATOR} state machine event for {stack_tag}" try: error_notifier_service.send_notification( subject=subject, template_name=TEMPLATE_FILE, template_attributes=error_payload, error_message=str(e), stack_trace=stack_trace ) except Exception as err: logger.error(f"An error occurred attempting to send error notification: {str(err)}") return { 'statusCode': 500, 'body': event, 'headers': {'Content-Type': 'application/json'} }