--- AWSTemplateFormatVersion: '2010-09-09' Description: Template creating custom Lambda Config rule to check for public AMIs Parameters: S3BucketName: Type: String Description: S3 bucket for evidence BucketRegion: Type: String Default: 'us-east-1' Description: S3 bucket region ReportName: Type: String Description: Name of the evidence report (e.g. report.html) S3Prefix: Type: String Description: S3 Prefix to store evidence (e.g. PREFIX or PREFIX1/PREFIX2) ConfigRuleName: Type: String Description: Name for the new Config rule Resources: ReportNameParam: Type: AWS::SSM::Parameter Properties: Description: Evidence report name Name: /AuditManagerEvidenceAutomation/report_name Type: String Value: !Ref ReportName S3BucketParam: Type: AWS::SSM::Parameter Properties: Description: S3 bucket for evidence Name: /AuditManagerEvidenceAutomation/s3_bucket Type: String Value: !Ref S3BucketName S3BucketRegionParam: Type: AWS::SSM::Parameter Properties: Description: Region for evidence S3 bucket Name: /AuditManagerEvidenceAutomation/bucket_region Type: String Value: !Ref BucketRegion S3PrefixParam: Type: AWS::SSM::Parameter Properties: Description: S3 bucket prefix for evidence Name: /AuditManagerEvidenceAutomation/s3_prefix Type: String Value: !Ref S3Prefix ConfigRuleFunction: DependsOn: LambdaLogGroup Type: AWS::Lambda::Function Properties: FunctionName: !Ref ConfigRuleName Handler: index.lambda_handler Role: !GetAtt ConfigRuleLambdaExecutionRole.Arn MemorySize: 128 Timeout: 360 Environment: Variables: report_name: !Ref ReportNameParam evidence_bucket: !Ref S3BucketParam bucket_region: !Ref S3BucketRegionParam s3_prefix: !Ref S3PrefixParam Code: ZipFile: | ''' ##################################### ## Gherkin ## ##################################### Rule Name: {Rule Name Here} Description: {Place description of rule logic including why it evaluates compliant or non-compliant here} Trigger: Periodic Reports on: AWS::::Account Rule Parameters: None Scenarios: Scenario: 1 Given: No AMIs with "is-public" parameter set to True Then: Return COMPLIANT Scenario: 2 Given: One or more AMIs with is-public parameter set to True Then: Return NON_COMPLIANT with Annotation containing AMI IDs ''' import json import time import sys import datetime import boto3 import botocore import os # attempt to import a couple of libs try: import liblogging except ImportError: pass # Define the default resource to report to Config Rules DEFAULT_RESOURCE_TYPE = 'AWS::::Account' # Set to True to get the lambda to assume the Role attached on the Config Service (useful for cross-account). ASSUME_ROLE_MODE = False # Other parameters (no change needed) CONFIG_ROLE_TIMEOUT_SECONDS = 900 # Import parameters from parameter store. Path to parameter store is in Lambda env variables. ssm_client = boto3.client("ssm") try: s3_bucket = ( ssm_client.get_parameter(Name=os.environ["evidence_bucket"], WithDecryption=True) .get("Parameter") .get("Value") ) except botocore.exceptions.ClientError as error: print("Error getting S3 bucket parameter. Ensure Env Var is set and pointing to a Parameter Store parameter") raise error try: s3_region = ( ssm_client.get_parameter(Name=os.environ["bucket_region"], WithDecryption=True) .get("Parameter") .get("Value") ) except botocore.exceptions.ClientError as error: print("Error getting S3 region parameter. Ensure Env Var is set and pointing to a Parameter Store parameter") raise error try: s3_report_name = ( ssm_client.get_parameter(Name=os.environ["report_name"], WithDecryption=True) .get("Parameter") .get("Value") ) except botocore.exceptions.ClientError as error: print("Error getting report name parameter. Ensure Env Var is set and pointing to a Parameter Store parameter") raise error try: s3_prefix = ( ssm_client.get_parameter(Name=os.environ["s3_prefix"], WithDecryption=True) .get("Parameter") .get("Value") ) except botocore.exceptions.ClientError as error: print("Error getting S3 prefix parameter. Ensure Env Var is set and pointing to a Parameter Store parameter") raise error # class for the tags breakout class TagData: def __init__(self, ControlID, ControlText, Framework, RequestedInfo): self.ControlID = ControlID self.ControlText = ControlText self.Framework = Framework self.RequestedInfo = RequestedInfo # lets construct the report writing bits # headers html_head = ["<!DOCTYPE html>\n", "<html>\n", "<head>\n", "<title>"] html_body = ["</title>\n", "</head>\n", "<body>\n"] html_done = ["</body>\n", "</html>\n"] html_style = """ <style> table { border-collapse: collapse; } body { font-family: Courier, Helvetica, sans-serif; } </style> """ def writeh1(fhw, str): fhw.write("<h1>" + str + "</h1>\n") return def writeh2(fhw, str): fhw.write("<h2>" + str + "</h2>\n") return def writeh3(fhw, str): fhw.write("<h3>" + str + "</h3>\n") return # this function writes the evidence report def writeReport(compliant_evidence, non_compliant_evidence, event): jdata = json.dumps(compliant_evidence, indent=4, separators=(". ", " = ")) jdata = jdata.replace('\n', '<br>') jdata = jdata.replace(' ', ' ') jdata2 = json.dumps(non_compliant_evidence, indent=4, separators=(". ", " = ")) jdata2 = jdata2.replace('\n', '<br>') jdata2 = jdata2.replace(' ', ' ') tags = getTagListforThisRule(event['configRuleArn']) print(f"Here are the tags: {tags.ControlText}.") try: fhw = open(f"/tmp/" + s3_report_name, "w") except error: print(f"Could not open " + s3_report_name) sys.exit(1) fhw.writelines(html_head) fhw.writelines(html_body) fhw.writelines(html_style) writeh1(fhw, f"Control ID: {tags.ControlID}") writeh2(fhw, f"Date Evidence Collected: {str(datetime.date.today())}") writeh3(fhw, f"Description: {tags.ControlText}") writeh3(fhw, f"AWS Account Number: {event['accountId']}") writeh3(fhw, f"Evidence Requested: {tags.RequestedInfo}") if jdata: writeh3(fhw, "Compliant Evidence") fhw.writelines('<code style="background-color: #dddddd">' + jdata + '</code>') if jdata2: writeh3(fhw, "Non-compliant Evidence") fhw.writelines('<code style="background-color: #dddddd">' + jdata2 + '</code>') fhw.writelines(html_done) fhw.close() # get the config rule tags def getTagListforThisRule(ruleArn): client = boto3.client('config') response = client.list_tags_for_resource( ResourceArn=ruleArn )['Tags'] ControlID = '' ControlText = '' Framework = '' RequestedInfo = '' for t in response: if t['Key'] == 'ControlID': ControlID = t['Value'] elif t['Key'] == 'ControlText': ControlText = t['Value'] elif t['Key'] == 'Framework': Framework = t['Value'] elif t['Key'] == 'RequestedInfo': RequestedInfo = t['Value'] t = TagData(ControlID, ControlText, Framework, RequestedInfo) return t # this function uploads the evidence to the designated S3 bucket def upload_evidence(event, configuration_item): s3client = boto3.client('s3', region_name=s3_region) s3_url_prefix = 'https://s3.console.aws.amazon.com/s3/buckets/' s3path = s3_prefix + '/' + event['configRuleName'] + "/" + event['accountId'] + "/" + str( datetime.date.today()) + "/" + time.strftime("%H-%M", time.localtime()) + "/" + s3_report_name if configuration_item: if 'resourceId' in configuration_item: s3path = s3_prefix + '/' + event['configRuleName'] + "/" + event['accountId'] + "/" + configuration_item[ 'resourceId'] + "/" + str(datetime.date.today()) + "/" + time.strftime("%H-%M", time.localtime()) + "/" + s3_report_name s3client.upload_file('/tmp/' + s3_report_name, s3_bucket, s3path) evidence_url = s3_url_prefix + s3_bucket + '?prefix=' + s3path return evidence_url # Generates list of image_id's of public images def generate_image_id_list(images, event): image_ids = [] for image in images: image_ids.append(image['ImageId']) return image_ids def build_annotation(annotation_string): if len(annotation_string) > 256: return annotation_string[:244] + " [truncated]" return annotation_string # Evaluate compliance and call write report and upload evidence functions if evidence exist def evaluate_compliance(event, configuration_item, valid_rule_parameters): ec2_client = get_client('ec2', event) print("getting public images") try: public_ami_result = ec2_client.describe_images( Filters=[ { 'Name': 'is-public', 'Values': ['true'] } ], Owners=[event['accountId']] ) except error: print("Could not get public images. Please check permissions.") raise error print(public_ami_result) print("getting private AMIs") try: private_ami_result = ec2_client.describe_images( Filters=[ { 'Name': 'is-public', 'Values': ['false'] } ], Owners=[event['accountId']] ) except error: print("Could not get private images. Please check permissions.") raise error print(private_ami_result) if not public_ami_result['Images'] and not private_ami_result['Images']: return build_evaluation(event['accountId'], "NOT_APPLICABLE", event) writeReport(compliant_evidence=private_ami_result['Images'], non_compliant_evidence=public_ami_result['Images'], event=event) evidence_url = upload_evidence(event, configuration_item) # If public_ami_list is not empty, generate non-compliant response evaluations = [] if public_ami_result['Images']: evaluations.append( build_evaluation( event['accountId'], 'NON_COMPLIANT', event, annotation=evidence_url ) ) return evaluations return build_evaluation(event['accountId'], "COMPLIANT", event, annotation=evidence_url) def evaluate_parameters(rule_parameters): valid_rule_parameters = rule_parameters return valid_rule_parameters #################### # Helper Functions # #################### # Build an error to be displayed in the logs when the parameter is invalid. def build_parameters_value_error_response(ex): return build_error_response(internal_error_message="Parameter value is invalid", internal_error_details="An ValueError was raised during the validation of the Parameter value", customer_error_code="InvalidParameterValueException", customer_error_message=str(ex)) # This gets the client after assuming the Config service role # either in the same AWS account or cross-account. def get_client(service, event): if not ASSUME_ROLE_MODE: return boto3.client(service) credentials = get_assume_role_credentials(event["executionRoleArn"]) return boto3.client(service, aws_access_key_id=credentials['AccessKeyId'], aws_secret_access_key=credentials['SecretAccessKey'], aws_session_token=credentials['SessionToken'] ) # This generate an evaluation for config def build_evaluation(resource_id, compliance_type, event, resource_type=DEFAULT_RESOURCE_TYPE, annotation=None): eval_cc = {} if annotation: eval_cc['Annotation'] = build_annotation(annotation) eval_cc['ComplianceResourceType'] = resource_type eval_cc['ComplianceResourceId'] = resource_id eval_cc['ComplianceType'] = compliance_type eval_cc['OrderingTimestamp'] = str(json.loads(event['invokingEvent'])['notificationCreationTime']) return eval_cc def build_evaluation_from_config_item(configuration_item, compliance_type, annotation=None): eval_ci = {} if annotation: eval_ci['Annotation'] = build_annotation(annotation) eval_ci['ComplianceResourceType'] = configuration_item['resourceType'] eval_ci['ComplianceResourceId'] = configuration_item['resourceId'] eval_ci['ComplianceType'] = compliance_type eval_ci['OrderingTimestamp'] = configuration_item['configurationItemCaptureTime'] return eval_ci #################### # Boilerplate Code # #################### # Helper function used to validate input def check_defined(reference, reference_name): if not reference: raise Exception('Error: ', reference_name, 'is not defined') return reference # Check whether the message is OversizedConfigurationItemChangeNotification or not def is_oversized_changed_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'OversizedConfigurationItemChangeNotification' # Check whether the message is a ScheduledNotification or not. def is_scheduled_notification(message_type): check_defined(message_type, 'messageType') return message_type == 'ScheduledNotification' # Get configurationItem using getResourceConfigHistory API # in case of OversizedConfigurationItemChangeNotification def get_configuration(resource_type, resource_id, configuration_capture_time): result = AWS_CONFIG_CLIENT.get_resource_config_history( resourceType=resource_type, resourceId=resource_id, laterTime=configuration_capture_time, limit=1) configuration_item = result['configurationItems'][0] return convert_api_configuration(configuration_item) # Convert from the API model to the original invocation model def convert_api_configuration(configuration_item): for k, v in configuration_item.items(): if isinstance(v, datetime.datetime): configuration_item[k] = str(v) configuration_item['awsAccountId'] = configuration_item['accountId'] configuration_item['ARN'] = configuration_item['arn'] configuration_item['configurationStateMd5Hash'] = configuration_item['configurationItemMD5Hash'] configuration_item['configurationItemVersion'] = configuration_item['version'] configuration_item['configuration'] = json.loads(configuration_item['configuration']) if 'relationships' in configuration_item: for i in range(len(configuration_item['relationships'])): configuration_item['relationships'][i]['name'] = configuration_item['relationships'][i]['relationshipName'] return configuration_item # Based on the type of message get the configuration item # either from configurationItem in the invoking event # or using the getResourceConfigHistiry API in getConfiguration function. def get_configuration_item(invoking_event): check_defined(invoking_event, 'invokingEvent') if is_oversized_changed_notification(invoking_event['messageType']): configuration_item_summary = check_defined(invoking_event['configuration_item_summary'], 'configurationItemSummary') return get_configuration(configuration_item_summary['resourceType'], configuration_item_summary['resourceId'], configuration_item_summary['configurationItemCaptureTime']) if is_scheduled_notification(invoking_event['messageType']): return None return check_defined(invoking_event['configurationItem'], 'configurationItem') # Check whether the resource has been deleted. If it has, then the evaluation is unnecessary. def is_applicable(configuration_item, event): try: check_defined(configuration_item, 'configurationItem') check_defined(event, 'event') except: return True status = configuration_item['configurationItemStatus'] event_left_scope = event['eventLeftScope'] if status == 'ResourceDeleted': print("Resource Deleted, setting Compliance Status to NOT_APPLICABLE.") return status in ('OK', 'ResourceDiscovered') and not event_left_scope def get_assume_role_credentials(role_arn): sts_client = boto3.client('sts') try: assume_role_response = sts_client.assume_role(RoleArn=role_arn, RoleSessionName="configLambdaExecution", DurationSeconds=CONFIG_ROLE_TIMEOUT_SECONDS) if 'liblogging' in sys.modules: liblogging.logSession(role_arn, assume_role_response) return assume_role_response['Credentials'] except botocore.exceptions.ClientError as ex: # Scrub error message for any internal account info leaks print(str(ex)) if 'AccessDenied' in ex.response['Error']['Code']: ex.response['Error']['Message'] = "AWS Config does not have permission to assume the IAM role." else: ex.response['Error']['Message'] = "InternalError" ex.response['Error']['Code'] = "InternalError" raise ex # This removes older evaluation (usually useful for periodic rule not reporting on AWS::::Account). def clean_up_old_evaluations(latest_evaluations, event): cleaned_evaluations = [] old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule( ConfigRuleName=event['configRuleName'], ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'], Limit=100) old_eval_list = [] while True: for old_result in old_eval['EvaluationResults']: old_eval_list.append(old_result) if 'NextToken' in old_eval: next_token = old_eval['NextToken'] old_eval = AWS_CONFIG_CLIENT.get_compliance_details_by_config_rule( ConfigRuleName=event['configRuleName'], ComplianceTypes=['COMPLIANT', 'NON_COMPLIANT'], Limit=100, NextToken=next_token) else: break for old_eval in old_eval_list: old_resource_id = old_eval['EvaluationResultIdentifier']['EvaluationResultQualifier']['ResourceId'] newer_founded = False for latest_eval in latest_evaluations: if old_resource_id == latest_eval['ComplianceResourceId']: newer_founded = True if not newer_founded: cleaned_evaluations.append(build_evaluation(old_resource_id, "NOT_APPLICABLE", event)) return cleaned_evaluations + latest_evaluations def lambda_handler(event, context): if 'liblogging' in sys.modules: liblogging.logEvent(event) global AWS_CONFIG_CLIENT # store the actual account num # global AWS_ACCOUNT=event['accountId'] # global CUSTOM_RULE_NAME=event['configRuleName'] check_defined(event, 'event') invoking_event = json.loads(event['invokingEvent']) rule_parameters = {} if 'ruleParameters' in event: rule_parameters = json.loads(event['ruleParameters']) try: valid_rule_parameters = evaluate_parameters(rule_parameters) except ValueError as ex: return build_parameters_value_error_response(ex) try: AWS_CONFIG_CLIENT = get_client('config', event) if invoking_event['messageType'] in ['ConfigurationItemChangeNotification', 'ScheduledNotification', 'OversizedConfigurationItemChangeNotification']: print("getting conf item") configuration_item = get_configuration_item(invoking_event) print(configuration_item) if is_applicable(configuration_item, event): print("getting comp results") compliance_result = evaluate_compliance(event, configuration_item, valid_rule_parameters) else: compliance_result = "NOT_APPLICABLE" else: return build_internal_error_response('Unexpected message type', str(invoking_event)) except botocore.exceptions.ClientError as ex: if is_internal_error(ex): return build_internal_error_response("Unexpected error while completing API request", str(ex)) return build_error_response("Customer error while making API request", str(ex), ex.response['Error']['Code'], ex.response['Error']['Message']) except ValueError as ex: return build_internal_error_response(str(ex), str(ex)) evaluations = [] latest_evaluations = [] if not compliance_result: latest_evaluations.append( build_evaluation(event['accountId'], "NOT_APPLICABLE", event, resource_type='AWS::::Account')) evaluations = clean_up_old_evaluations(latest_evaluations, event) elif isinstance(compliance_result, str): if configuration_item: evaluations.append(build_evaluation_from_config_item(configuration_item, compliance_result)) else: print(*compliance_result) evaluations.append( build_evaluation(event['accountId'], compliance_result, event, resource_type=DEFAULT_RESOURCE_TYPE)) elif isinstance(compliance_result, list): for evaluation in compliance_result: missing_fields = False for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'): if field not in evaluation: print("Missing " + field + " from custom evaluation.") missing_fields = True if not missing_fields: latest_evaluations.append(evaluation) evaluations = clean_up_old_evaluations(latest_evaluations, event) elif isinstance(compliance_result, dict): missing_fields = False for field in ('ComplianceResourceType', 'ComplianceResourceId', 'ComplianceType', 'OrderingTimestamp'): if field not in compliance_result: print("Missing " + field + " from custom evaluation.") missing_fields = True if not missing_fields: evaluations.append(compliance_result) else: evaluations.append(build_evaluation_from_config_item(configuration_item, 'NOT_APPLICABLE')) # Put together the request that reports the evaluation status result_token = event['resultToken'] test_mode = False if result_token == 'TESTMODE': # Used solely for RDK test to skip actual put_evaluation API call test_mode = True # Invoke the Config API to report the result of the evaluation evaluation_copy = [] evaluation_copy = evaluations[:] while evaluation_copy: AWS_CONFIG_CLIENT.put_evaluations(Evaluations=evaluation_copy[:100], ResultToken=result_token, TestMode=test_mode) del evaluation_copy[:100] # Used solely for RDK test to be able to test Lambda function return evaluations def is_internal_error(exception): return ((not isinstance(exception, botocore.exceptions.ClientError)) or exception.response['Error'][ 'Code'].startswith('5') or 'InternalError' in exception.response['Error']['Code'] or 'ServiceError' in exception.response['Error'][ 'Code']) def build_internal_error_response(internal_error_message, internal_error_details=None): return build_error_response(internal_error_message, internal_error_details, 'InternalError', 'InternalError') def build_error_response(internal_error_message, internal_error_details=None, customer_error_code=None, customer_error_message=None): error_response = { 'internalErrorMessage': internal_error_message, 'internalErrorDetails': internal_error_details, 'customerErrorMessage': customer_error_message, 'customerErrorCode': customer_error_code } print(error_response) return error_response Runtime: python3.9 ConfigRuleLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: ConfigRuleLambdaPolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogStream Resource: !Sub "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${ConfigRuleName}:*" - Effect: Allow Action: - logs:PutLogEvents Resource: !Sub "arn:${AWS::Partition}:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${ConfigRuleName}:log-stream:*" - Effect: Allow Action: - ssm:GetParameter Resource: !Sub "arn:${AWS::Partition}:ssm:${AWS::Region}:${AWS::AccountId}:parameter/AuditManagerEvidenceAutomation/*" - Effect: Allow Action: - ec2:DescribeImages Resource: '*' - Effect: Allow Action: ['config:PutEvaluations', 'config:DescribeConfigRules', 'config:DescribeComplianceByConfigRule', 'config:DescribeConfigRuleEvaluationStatus', 'config:DeleteEvaluationResults', 'config:StartConfigRulesEvaluation', 'config:ListTagsForResource', 'config:GetComplianceDetailsByConfigRule'] Resource: '*' - Effect: Allow Action: - s3:PutObject Resource: !Sub "arn:${AWS::Partition}:s3:::${S3BucketName}/*" LambdaLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: !Sub "/aws/lambda/${ConfigRuleName}" RetentionInDays: 30 ConfigRule: Type: AWS::Config::ConfigRule Properties: Description: "Custom Config rule to trigger Lambda" ConfigRuleName: !Ref ConfigRuleName Source: Owner: CUSTOM_LAMBDA SourceDetails: - EventSource: aws.config MessageType: ScheduledNotification SourceIdentifier: Fn::GetAtt: - "ConfigRuleFunction" - "Arn" MaximumExecutionFrequency: TwentyFour_Hours DependsOn: PermissionForEventsToInvokeLambda PermissionForEventsToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: !Ref "ConfigRuleFunction" Action: "lambda:InvokeFunction" Principal: "config.amazonaws.com"