AWSTemplateFormatVersion: 2010-09-09 Parameters: ParentStackID: Type: String Description: The Logical ID of the Parent Vendor Insights Onboarding Stack for this region IsAWSConfigNeeded: Type: String Default: "Yes" Description: The parameter governing whether AWS Config is even needed for the current deployment context AllowedValues: - "Yes" - "No" Resources: EnabledAWSConfigCheckFunction: Type: AWS::Lambda::Function Properties: Environment: Variables: ParentStackName: !Ref ParentStackID IsConfigNeeded: !Ref IsAWSConfigNeeded Handler: index.lambda_handler Timeout: 900 Role: !GetAtt [VendorInsightsEnableAWSConfigCheckRole, Arn] Runtime: python3.9 Code: ZipFile: | # Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. import boto3 import cfnresponse import json import logging import uuid import os logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def is_aws_config_delivery_channel_created(): """ Verify whether there exists a Delivery Channel for AWS Config. Parameters: - None Returns: - Boolean """ try: parent_stack_name = os.environ['ParentStackName'] client = boto3.client('config') response = client.describe_delivery_channels() delivery_channels = response["DeliveryChannels"] num_delivery_channels = len(delivery_channels) if num_delivery_channels == 0: return False logger.info("AWS Config Delivery Channels Count : {0}".format(num_delivery_channels)) s3_client = boto3.client('s3') for delivery_channel in delivery_channels: if ('s3BucketName' in delivery_channel) and (parent_stack_name.lower() in delivery_channel['s3BucketName']): bucket_name = delivery_channel['s3BucketName'] s3_client.put_public_access_block( Bucket=bucket_name, PublicAccessBlockConfiguration={ 'BlockPublicAcls': True, 'IgnorePublicAcls': True, 'BlockPublicPolicy': True, 'RestrictPublicBuckets': True }, ) return True except Exception as e: logger.exception("Could not retreive the number of AWS Config Delivery Channels") logger.exception(str(e)) raise def is_aws_config_recorder_created(): """ Verify whether there exists a Recorder for AWS Config. Parameters: - None Returns: - Boolean """ try: client = boto3.client('config') response = client.describe_configuration_recorders() num_configuration_recorders = len(response["ConfigurationRecorders"]) if num_configuration_recorders == 0: return False logger.info("Number of AWS Config Configuration Recorders : {0}".format(num_configuration_recorders)) return True except Exception as e: logger.exception("Could not count the number of AWS Config configuration recorders") logger.exception(str(e)) raise def is_aws_config_started_recording(): """ Verify whether recording has been enabled for AWS Config. Parameters: - None Returns: - Boolean """ try: client = boto3.client('config') response = client.describe_configuration_recorder_status() config_recorder_status = response["ConfigurationRecordersStatus"] for config_recorder in config_recorder_status: if config_recorder["recording"] == True: logger.info("AWS Config Configuration Recorder is recording") return True logger.info("AWS Config Configuration Recorder is not recording") return False except Exception as e: logger.exception("Could not determine config recorder status for AWS Config") logger.exception(str(e)) raise def is_config_enabled(): """ Verify whether AWS Config has already been enabled. Parameters: - None Returns: - Boolean """ try: return is_aws_config_delivery_channel_created() \ and is_aws_config_recorder_created() \ and is_aws_config_started_recording() except Exception as e: logger.exception("Could not determine if AWS Config is enabled") logger.exception(str(e)) raise def is_config_needed(): """ Verify whether AWS Config is even needed for the current deployment context Parameters: - None Returns: - Boolean """ try: is_config_needed = os.environ['IsConfigNeeded'] if is_config_needed == "No": return False else: return True except Exception as e: logger.exception("Could not find 'IsConfigNeeded' as an environment variable") logger.exception(str(e)) raise def start_recording(): try: client = boto3.client('config') response = client.describe_configuration_recorder_status() config_recorder_status = response["ConfigurationRecordersStatus"] for config_recorder in config_recorder_status: config_recorder_name = config_recorder['name'] client.start_configuration_recorder(ConfigurationRecorderName=config_recorder_name) except Exception as e: logger.exception("Could not start recording for already existing Config Recorder Resources") logger.exception(str(e)) raise def are_config_resources_present(): """ Verify there already exist AWS Config resources Parameters: - None Returns: - Boolean """ try: return is_aws_config_delivery_channel_created() \ and is_aws_config_recorder_created() except Exception as e: logger.exception("Could not determine if AWS Config resources are present") logger.exception(str(e)) raise def is_recorder_from_older_templates(): """ Determine whether the Config Recorder in use at the moment is from Vendor Insights Parameters: - None Returns: - Boolean """ try: config_client = boto3.client('config') response = config_client.describe_configuration_recorders() configuration_recorders = response["ConfigurationRecorders"] parent_stack_name = os.environ['ParentStackName'] for configuration_recorder in configuration_recorders: recorder_name = configuration_recorder["name"] if parent_stack_name in recorder_name: return True return False except Exception as e: logger.exception("Couldn't determine the origin of the current Configuration Recorder") logger.exception(str(e)) raise def delete_config_recorder(): """ Delete the Config Recorder that originated from Vendor Insights Onboarding Template Parameters: - None Returns: - None """ try: config_client = boto3.client('config') response = config_client.describe_configuration_recorders() configuration_recorders = response["ConfigurationRecorders"] parent_stack_name = os.environ['ParentStackName'] for configuration_recorder in configuration_recorders: recorder_name = configuration_recorder["name"] if parent_stack_name in recorder_name: config_client.delete_configuration_recorder(ConfigurationRecorderName=recorder_name) except Exception as e: logger.exception("Encountered Exception while trying to delete configuration recorder") logger.exception(str(e)) raise def lambda_handler(event, context): logger.info("CloudFormation Custom Resource was invoked with event: " + json.dumps(event, default=str)) EVENT_TYPE = event['RequestType'] if ('PhysicalResourceId' in event) and event['PhysicalResourceId']: physicalResourceId = event['PhysicalResourceId'] logger.info("Reusing the same physicalResourceId as: " + physicalResourceId) else: physicalResourceId = str(uuid.uuid4()) logger.info("physicalResourceId does not exist in the event, creating new one: " + physicalResourceId) try: response_data = {} if EVENT_TYPE == 'Delete': cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data, physicalResourceId) elif EVENT_TYPE == 'Create': if not is_config_needed(): response_data["TurnOnConfig"] = "No" elif is_config_enabled(): response_data["TurnOnConfig"] = "No" elif are_config_resources_present(): response_data["TurnOnConfig"] = "No" # Turn On AWS Config Recording with already present resources start_recording() else: response_data["TurnOnConfig"] = "Yes" cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data, physicalResourceId) elif EVENT_TYPE == 'Update': if not is_config_needed(): response_data["TurnOnConfig"] = "No" elif is_config_enabled(): response_data["TurnOnConfig"] = "No" elif are_config_resources_present(): response_data["TurnOnConfig"] = "No" # Turn On AWS Config Recording with already present resources start_recording() else: response_data["TurnOnConfig"] = "Yes" # Deal with the edge case of dealing with an older version of CheckAWSConfig if response_data["TurnOnConfig"] == "No" and is_config_needed(): if is_recorder_from_older_templates(): delete_config_recorder() response_data["TurnOnConfig"] = "Yes" cfnresponse.send(event, context, cfnresponse.SUCCESS, response_data, physicalResourceId) except Exception as e: logger.exception(str(e)) response_data = {"error": str(e)} cfnresponse.send(event, context, cfnresponse.FAILED, response_data, physicalResourceId) VendorInsightsEnableAWSConfigCheckRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole - arn:aws:iam::aws:policy/service-role/AWSConfigRulesExecutionRole - arn:aws:iam::aws:policy/service-role/AWS_ConfigRole - arn:aws:iam::aws:policy/AWSCloudFormationFullAccess - arn:aws:iam::aws:policy/AmazonS3FullAccess Policies: - PolicyName: VendorInsightsAWSConfigFullAccess PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 'config:*' Resource: - '*' EnabledAWSConfigCheckFunctionTrigger: Type: AWS::CloudFormation::CustomResource Properties: ServiceToken: !GetAtt EnabledAWSConfigCheckFunction.Arn # This versions needs to be updated to trigger update events on the lambda function UUID: "db7815a5-5154-40b3-9a0d-32c8632c15e7" IsAWSConfigNeeded: !Ref IsAWSConfigNeeded AWSConfigResourcesStack: Type: AWS::CloudFormation::Stack DeletionPolicy: Retain DependsOn: - EnabledAWSConfigCheckFunctionTrigger Properties: Parameters: SetupConfig: !GetAtt EnabledAWSConfigCheckFunctionTrigger.TurnOnConfig Tags: - Key: "AssociateWith" Value: "AWSVendorInsights" TemplateURL: https://aws-vendor-insights.s3.amazonaws.com/vendor-onboarding-templates/v0/EnableAWSConfig.yaml TimeoutInMinutes: 20 Outputs: TurnOnConfig: Description: "Should Enable AWS Config" Value: !GetAtt EnabledAWSConfigCheckFunctionTrigger.TurnOnConfig