# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import logging from typing import Dict import boto3 import botocore import json import urllib3 logger = logging.getLogger(__name__) http = urllib3.PoolManager() nfw = boto3.client("network-firewall") ec2 = boto3.client("ec2") def send_response(event, context, response, responseData): """Send a response to CloudFormation to handle the custom resource.""" responseBody = { "Status": response, "Reason": f"See details in CloudWatch Log Stream: {context.log_stream_name}", "PhysicalResourceId": context.log_stream_name, "StackId": event["StackId"], "RequestId": event["RequestId"], "LogicalResourceId": event["LogicalResourceId"], "Data": responseData, } logger.info("RESPONSE BODY: \n" + json.dumps(responseBody)) responseUrl = event["ResponseURL"] json_responseBody = json.dumps(responseBody) headers = {"content-type": "", "content-length": str(len(json_responseBody))} try: response = http.request("PUT", responseUrl, headers=headers, body=json_responseBody) logger.info("Status code: " + response.status) except Exception as e: logger.warning("send(..) failed executing requests.put(..): " + str(e)) return True def get_data(firewall_arn: str) -> Dict[str, str]: response = nfw.describe_firewall(FirewallArn=firewall_arn) return { k: v["Attachment"]["EndpointId"] for k, v in response["FirewallStatus"]["SyncStates"].items() } def lambda_handler(event, context): if event["RequestType"] == "Create": try: firewall_arn = event["ResourceProperties"]["FirewallArn"] subnet_az = event["ResourceProperties"]["SubnetAz"] destination_cidr = event["ResourceProperties"]["DestinationCidr"] route_table_id = event["ResourceProperties"]["RouteTableId"] endpoints = get_data(firewall_arn) ec2.create_route( DestinationCidrBlock=destination_cidr, RouteTableId=route_table_id, VpcEndpointId=endpoints[subnet_az], ) response = "SUCCESS" data = {} except botocore.exceptions.ClientError as error: logger.error(f"error due to {error}") response = "FAILED" data = {"Error": "Create route failed for firewall"} send_response(event, context, response, data) elif event["RequestType"] == "Update": response = "SUCCESS" data = {} send_response(event, context, response, data) elif event["RequestType"] == "Delete": try: route_table_id = event["ResourceProperties"]["RouteTableId"] destination_cidr = event["ResourceProperties"]["DestinationCidr"] ec2.delete_route(DestinationCidrBlock=destination_cidr, RouteTableId=route_table_id) response = "SUCCESS" data = {} except botocore.exceptions.ClientError as error: logger.error(f"error due to {error}") response = "FAILED" data = {"Error": "Delete route failed for firewall"} send_response(event, context, response, data)