// Jest Snapshot v1, https://goo.gl/fbAQLP exports[`Member Stack - AFSBP 1`] = ` { "Conditions": { "EnableEC21Condition": { "Fn::Equals": [ { "Ref": "EnableEC21", }, "Available", ], }, "EnableLambda1Condition": { "Fn::Equals": [ { "Ref": "EnableLambda1", }, "Available", ], }, "EnableRDS1Condition": { "Fn::Equals": [ { "Ref": "EnableRDS1", }, "Available", ], }, }, "Description": "test;", "Parameters": { "EnableEC21": { "AllowedValues": [ "Available", "NOT Available", ], "Default": "Available", "Description": "Enable/disable availability of remediation for AFSBP version 1.0.0 Control EC2.1 in Security Hub Console Custom Actions. If NOT Available the remediation cannot be triggered from the Security Hub console in the Security Hub Admin account.", "Type": "String", }, "EnableLambda1": { "AllowedValues": [ "Available", "NOT Available", ], "Default": "Available", "Description": "Enable/disable availability of remediation for AFSBP version 1.0.0 Control Lambda.1 in Security Hub Console Custom Actions. If NOT Available the remediation cannot be triggered from the Security Hub console in the Security Hub Admin account.", "Type": "String", }, "EnableRDS1": { "AllowedValues": [ "Available", "NOT Available", ], "Default": "Available", "Description": "Enable/disable availability of remediation for AFSBP version 1.0.0 Control RDS.1 in Security Hub Console Custom Actions. If NOT Available the remediation cannot be triggered from the Security Hub console in the Security Hub Admin account.", "Type": "String", }, "SecHubAdminAccount": { "AllowedPattern": "^\\d{12}$", "Description": "Admin account number", "Type": "String", }, "WaitProviderServiceToken": { "Type": "String", }, }, "Resources": { "ControlAFSBPEC21": { "Condition": "EnableEC21Condition", "DependsOn": [ "CreateWait0", ], "Properties": { "Content": { "assumeRole": "{{ AutomationAssumeRole }}", "description": "### Document Name - ASR-AFSBP_1.0.0_EC2.1 ## What does this document do? This document changes all public EC2 snapshots to private ## Input Parameters * Finding: (Required) Security Hub finding details JSON * AutomationAssumeRole: (Required) The ARN of the role that allows Automation to perform the actions on your behalf. ## Documentation Links * [AFSBP EC2.1](https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-standards-fsbp-controls.html#fsbp-ec2-1) ", "mainSteps": [ { "action": "aws:executeScript", "inputs": { "Handler": "parse_event", "InputPayload": { "Finding": "{{Finding}}", "expected_control_id": [ "EC2.1", ], "parse_id_pattern": "", "resource_index": 2, }, "Runtime": "python3.8", "Script": "# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import re import json import boto3 from botocore.config import Config def connect_to_config(boto_config): return boto3.client('config', config=boto_config) def connect_to_ssm(boto_config): return boto3.client('ssm', config=boto_config) def get_solution_id(): return 'SO0111' def get_solution_version(): ssm = connect_to_ssm( Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/unknown' ) ) solution_version = 'unknown' try: ssm_parm_value = ssm.get_parameter( Name=f'/Solutions/{get_solution_id()}/member-version' )['Parameter'].get('Value', 'unknown') solution_version = ssm_parm_value except Exception as e: print(e) print(f'ERROR getting solution version') return solution_version def get_shortname(long_name): short_name = { 'aws-foundational-security-best-practices': 'AFSBP', 'cis-aws-foundations-benchmark': 'CIS', 'pci-dss': 'PCI', 'security-control': 'SC' } return short_name.get(long_name, None) def get_config_rule(rule_name): boto_config = Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/{get_solution_version()}' ) config_rule = None try: configsvc = connect_to_config(boto_config) config_rule = configsvc.describe_config_rules( ConfigRuleNames=[ rule_name ] ).get('ConfigRules', [])[0] except Exception as e: print(e) exit(f'ERROR getting config rule {rule_name}') return config_rule class FindingEvent: """ Finding object returns the parse fields from an input finding json object """ def _get_resource_id(self, parse_id_pattern, resource_index): identifier_raw = self.finding_json['Resources'][0]['Id'] self.resource_id = identifier_raw self.resource_id_matches = [] if parse_id_pattern: identifier_match = re.match( parse_id_pattern, identifier_raw ) if identifier_match: for group in range(1, len(identifier_match.groups())+1): self.resource_id_matches.append(identifier_match.group(group)) self.resource_id = identifier_match.group(resource_index) else: exit(f'ERROR: Invalid resource Id {identifier_raw}') def _get_sc_check(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'security-control/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname('security-control') self.control_id = match_finding_id.group(1) return match_finding_id def _get_standard_info(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'subscription/(.*?)/v/(\\d+\\.\\d+\\.\\d+)/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname(match_finding_id.group(1)) self.standard_version = match_finding_id.group(2) self.control_id = match_finding_id.group(3) else: match_sc_finding_id = self._get_sc_check() if not match_sc_finding_id: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]}' def _get_aws_config_rule(self): # config_rule_id refers to the AWS Config Rule that produced the finding if "RelatedAWSResources:0/type" in self.finding_json['ProductFields'] and self.finding_json['ProductFields']['RelatedAWSResources:0/type'] == 'AWS::Config::ConfigRule': self.aws_config_rule_id = self.finding_json['ProductFields']['RelatedAWSResources:0/name'] self.aws_config_rule = get_config_rule(self.aws_config_rule_id) def _get_region_from_resource_id(self): check_for_region = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):[a-zA-Z0-9]+:([a-z]{2}(?:-gov)?-[a-z]+-\\d):.*:.*$', self.finding_json['Resources'][0]['Id'] ) if check_for_region: self.resource_region = check_for_region.group(1) def __init__(self, finding_json, parse_id_pattern, expected_control_id, resource_index): self.valid_finding = True self.resource_region = None self.control_id = None self.aws_config_rule_id = None self.aws_config_rule = {} """Populate fields""" # v1.5 self.finding_json = finding_json self._get_resource_id(parse_id_pattern, resource_index) # self.resource_id, self.resource_id_matches self._get_standard_info() # self.standard_id, self.standard_version, self.control_id # V1.4 self.account_id = self.finding_json.get('AwsAccountId', None) # deprecate - get Finding.AwsAccountId if not re.match(r'^\\d{12}$', self.account_id) and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'AwsAccountId is invalid: {self.account_id}' self.finding_id = self.finding_json.get('Id', None) # deprecate self.product_arn = self.finding_json.get('ProductArn', None) if not re.match(r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:[a-z]{2}(?:-gov)?-[a-z]+-\\d::product/aws/securityhub$', self.product_arn): if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'ProductArn is invalid: {self.product_arn}' self.details = self.finding_json['Resources'][0].get('Details', {}) # Test mode is used with fabricated finding data to tell the # remediation runbook to run in test more (where supported) # Currently not widely-used and perhaps should be deprecated. self.testmode = bool('testmode' in self.finding_json) self.resource = self.finding_json['Resources'][0] self._get_region_from_resource_id() self._get_aws_config_rule() self.affected_object = {'Type': self.resource['Type'], 'Id': self.resource_id, 'OutputKey': 'Remediation.Output'} # Validate control_id if not self.control_id: if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]} - missing Control Id' elif self.control_id not in expected_control_id: # ControlId is the expected value if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Control Id from input ({self.control_id}) does not match {str(expected_control_id)}' if not self.resource_id and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = 'Resource Id is missing from the finding json Resources (Id)' if not self.valid_finding: # Error message and return error data msg = f'ERROR: {self.invalid_finding_reason}' exit(msg) def __str__(self): return json.dumps(self.__dict__) ''' MAIN ''' def parse_event(event, _): finding_event = FindingEvent(event['Finding'], event['parse_id_pattern'], event['expected_control_id'], event.get('resource_index', 1)) if not finding_event.valid_finding: exit('ERROR: Finding is not valid') return { "account_id": finding_event.account_id, "resource_id": finding_event.resource_id, "finding_id": finding_event.finding_id, # Deprecate v1.5.0+ "control_id": finding_event.control_id, "product_arn": finding_event.product_arn, # Deprecate v1.5.0+ "object": finding_event.affected_object, "matches": finding_event.resource_id_matches, "details": finding_event.details, # Deprecate v1.5.0+ "testmode": finding_event.testmode, # Deprecate v1.5.0+ "resource": finding_event.resource, "resource_region": finding_event.resource_region, "finding": finding_event.finding_json, "aws_config_rule": finding_event.aws_config_rule }", }, "isEnd": false, "name": "ParseInput", "outputs": [ { "Name": "FindingId", "Selector": "$.Payload.finding_id", "Type": "String", }, { "Name": "ProductArn", "Selector": "$.Payload.product_arn", "Type": "String", }, { "Name": "AffectedObject", "Selector": "$.Payload.object", "Type": "StringMap", }, { "Name": "AccountId", "Selector": "$.Payload.account_id", "Type": "String", }, { "Name": "TestMode", "Selector": "$.Payload.testmode", "Type": "Boolean", }, ], }, { "action": "aws:executeAutomation", "inputs": { "DocumentName": "ASR-MakeEBSSnapshotsPrivate", "RuntimeParameters": { "AccountId": "{{ParseInput.AccountId}}", "AutomationAssumeRole": "arn:{{global:AWS_PARTITION}}:iam::{{global:ACCOUNT_ID}}:role/SO0111-MakeEBSSnapshotsPrivate", "TestMode": "{{ParseInput.TestMode}}", }, }, "isEnd": false, "name": "Remediation", }, { "action": "aws:executeAwsApi", "description": "Update finding", "inputs": { "Api": "BatchUpdateFindings", "FindingIdentifiers": [ { "Id": "{{ParseInput.FindingId}}", "ProductArn": "{{ParseInput.ProductArn}}", }, ], "Note": { "Text": "EBS Snapshot modified to private", "UpdatedBy": "ASR-AFSBP_1.0.0_EC2.1", }, "Service": "securityhub", "Workflow": { "Status": "RESOLVED", }, }, "isEnd": true, "name": "UpdateFinding", }, ], "outputs": [ "Remediation.Output", "ParseInput.AffectedObject", ], "parameters": { "AutomationAssumeRole": { "allowedPattern": "^arn:(?:aws|aws-us-gov|aws-cn):iam::\\d{12}:role/[\\w+=,.@-]+$", "description": "(Required) The ARN of the role that allows Automation to perform the actions on your behalf.", "type": "String", }, "Finding": { "description": "The input from the Orchestrator Step function for the EC2.1 finding", "type": "StringMap", }, }, "schemaVersion": "0.3", }, "DocumentFormat": "YAML", "DocumentType": "Automation", "Name": "ASR-AFSBP_1.0.0_EC2.1", "UpdateMethod": "NewVersion", }, "Type": "AWS::SSM::Document", }, "ControlAFSBPLambda1": { "Condition": "EnableLambda1Condition", "DependsOn": [ "CreateWait0", ], "Properties": { "Content": { "assumeRole": "{{ AutomationAssumeRole }}", "description": "### Document Name - ASR-AFSBP_1.0.0_Lambda.1 ## What does this document do? This document removes the public resource policy. A public resource policy contains a principal "*" or AWS: "*", which allows public access to the function. The remediation is to remove the SID of the public policy. ## Input Parameters * Finding: (Required) Security Hub finding details JSON * AutomationAssumeRole: (Required) The ARN of the role that allows Automation to perform the actions on your behalf. ## Documentation Links * [AFSBP Lambda.1](https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-standards-fsbp-controls.html#fsbp-lambda-1) ", "mainSteps": [ { "action": "aws:executeScript", "inputs": { "Handler": "parse_event", "InputPayload": { "Finding": "{{Finding}}", "expected_control_id": [ "Lambda.1", ], "parse_id_pattern": "^arn:(?:aws|aws-us-gov|aws-cn):lambda:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:function:([a-zA-Z0-9\\-_]{1,64})$", }, "Runtime": "python3.8", "Script": "# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import re import json import boto3 from botocore.config import Config def connect_to_config(boto_config): return boto3.client('config', config=boto_config) def connect_to_ssm(boto_config): return boto3.client('ssm', config=boto_config) def get_solution_id(): return 'SO0111' def get_solution_version(): ssm = connect_to_ssm( Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/unknown' ) ) solution_version = 'unknown' try: ssm_parm_value = ssm.get_parameter( Name=f'/Solutions/{get_solution_id()}/member-version' )['Parameter'].get('Value', 'unknown') solution_version = ssm_parm_value except Exception as e: print(e) print(f'ERROR getting solution version') return solution_version def get_shortname(long_name): short_name = { 'aws-foundational-security-best-practices': 'AFSBP', 'cis-aws-foundations-benchmark': 'CIS', 'pci-dss': 'PCI', 'security-control': 'SC' } return short_name.get(long_name, None) def get_config_rule(rule_name): boto_config = Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/{get_solution_version()}' ) config_rule = None try: configsvc = connect_to_config(boto_config) config_rule = configsvc.describe_config_rules( ConfigRuleNames=[ rule_name ] ).get('ConfigRules', [])[0] except Exception as e: print(e) exit(f'ERROR getting config rule {rule_name}') return config_rule class FindingEvent: """ Finding object returns the parse fields from an input finding json object """ def _get_resource_id(self, parse_id_pattern, resource_index): identifier_raw = self.finding_json['Resources'][0]['Id'] self.resource_id = identifier_raw self.resource_id_matches = [] if parse_id_pattern: identifier_match = re.match( parse_id_pattern, identifier_raw ) if identifier_match: for group in range(1, len(identifier_match.groups())+1): self.resource_id_matches.append(identifier_match.group(group)) self.resource_id = identifier_match.group(resource_index) else: exit(f'ERROR: Invalid resource Id {identifier_raw}') def _get_sc_check(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'security-control/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname('security-control') self.control_id = match_finding_id.group(1) return match_finding_id def _get_standard_info(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'subscription/(.*?)/v/(\\d+\\.\\d+\\.\\d+)/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname(match_finding_id.group(1)) self.standard_version = match_finding_id.group(2) self.control_id = match_finding_id.group(3) else: match_sc_finding_id = self._get_sc_check() if not match_sc_finding_id: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]}' def _get_aws_config_rule(self): # config_rule_id refers to the AWS Config Rule that produced the finding if "RelatedAWSResources:0/type" in self.finding_json['ProductFields'] and self.finding_json['ProductFields']['RelatedAWSResources:0/type'] == 'AWS::Config::ConfigRule': self.aws_config_rule_id = self.finding_json['ProductFields']['RelatedAWSResources:0/name'] self.aws_config_rule = get_config_rule(self.aws_config_rule_id) def _get_region_from_resource_id(self): check_for_region = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):[a-zA-Z0-9]+:([a-z]{2}(?:-gov)?-[a-z]+-\\d):.*:.*$', self.finding_json['Resources'][0]['Id'] ) if check_for_region: self.resource_region = check_for_region.group(1) def __init__(self, finding_json, parse_id_pattern, expected_control_id, resource_index): self.valid_finding = True self.resource_region = None self.control_id = None self.aws_config_rule_id = None self.aws_config_rule = {} """Populate fields""" # v1.5 self.finding_json = finding_json self._get_resource_id(parse_id_pattern, resource_index) # self.resource_id, self.resource_id_matches self._get_standard_info() # self.standard_id, self.standard_version, self.control_id # V1.4 self.account_id = self.finding_json.get('AwsAccountId', None) # deprecate - get Finding.AwsAccountId if not re.match(r'^\\d{12}$', self.account_id) and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'AwsAccountId is invalid: {self.account_id}' self.finding_id = self.finding_json.get('Id', None) # deprecate self.product_arn = self.finding_json.get('ProductArn', None) if not re.match(r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:[a-z]{2}(?:-gov)?-[a-z]+-\\d::product/aws/securityhub$', self.product_arn): if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'ProductArn is invalid: {self.product_arn}' self.details = self.finding_json['Resources'][0].get('Details', {}) # Test mode is used with fabricated finding data to tell the # remediation runbook to run in test more (where supported) # Currently not widely-used and perhaps should be deprecated. self.testmode = bool('testmode' in self.finding_json) self.resource = self.finding_json['Resources'][0] self._get_region_from_resource_id() self._get_aws_config_rule() self.affected_object = {'Type': self.resource['Type'], 'Id': self.resource_id, 'OutputKey': 'Remediation.Output'} # Validate control_id if not self.control_id: if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]} - missing Control Id' elif self.control_id not in expected_control_id: # ControlId is the expected value if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Control Id from input ({self.control_id}) does not match {str(expected_control_id)}' if not self.resource_id and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = 'Resource Id is missing from the finding json Resources (Id)' if not self.valid_finding: # Error message and return error data msg = f'ERROR: {self.invalid_finding_reason}' exit(msg) def __str__(self): return json.dumps(self.__dict__) ''' MAIN ''' def parse_event(event, _): finding_event = FindingEvent(event['Finding'], event['parse_id_pattern'], event['expected_control_id'], event.get('resource_index', 1)) if not finding_event.valid_finding: exit('ERROR: Finding is not valid') return { "account_id": finding_event.account_id, "resource_id": finding_event.resource_id, "finding_id": finding_event.finding_id, # Deprecate v1.5.0+ "control_id": finding_event.control_id, "product_arn": finding_event.product_arn, # Deprecate v1.5.0+ "object": finding_event.affected_object, "matches": finding_event.resource_id_matches, "details": finding_event.details, # Deprecate v1.5.0+ "testmode": finding_event.testmode, # Deprecate v1.5.0+ "resource": finding_event.resource, "resource_region": finding_event.resource_region, "finding": finding_event.finding_json, "aws_config_rule": finding_event.aws_config_rule }", }, "name": "ParseInput", "outputs": [ { "Name": "FindingId", "Selector": "$.Payload.finding_id", "Type": "String", }, { "Name": "ProductArn", "Selector": "$.Payload.product_arn", "Type": "String", }, { "Name": "AffectedObject", "Selector": "$.Payload.object", "Type": "StringMap", }, { "Name": "FunctionName", "Selector": "$.Payload.resource_id", "Type": "String", }, { "Name": "RemediationRegion", "Selector": "$.Payload.resource_region", "Type": "String", }, { "Name": "RemediationAccount", "Selector": "$.Payload.account_id", "Type": "String", }, ], }, { "action": "aws:executeAutomation", "inputs": { "DocumentName": "ASR-RemoveLambdaPublicAccess", "RuntimeParameters": { "AutomationAssumeRole": "arn:{{global:AWS_PARTITION}}:iam::{{global:ACCOUNT_ID}}:role/{{RemediationRoleName}}", "FunctionName": "{{ ParseInput.FunctionName }}", }, "TargetLocations": [ { "Accounts": [ "{{ParseInput.RemediationAccount}}", ], "ExecutionRoleName": "{{RemediationRoleName}}", "Regions": [ "{{ParseInput.RemediationRegion}}", ], }, ], }, "name": "Remediation", }, { "action": "aws:executeAwsApi", "description": "Update finding", "inputs": { "Api": "BatchUpdateFindings", "FindingIdentifiers": [ { "Id": "{{ParseInput.FindingId}}", "ProductArn": "{{ParseInput.ProductArn}}", }, ], "Note": { "Text": "Lamdba {{ParseInput.FunctionName}} policy updated to remove public access", "UpdatedBy": "ASR-AFSBP_1.0.0_Lambda.1", }, "Service": "securityhub", "Workflow": { "Status": "RESOLVED", }, }, "isEnd": true, "name": "UpdateFinding", }, ], "outputs": [ "Remediation.Output", "ParseInput.AffectedObject", ], "parameters": { "AutomationAssumeRole": { "allowedPattern": "^arn:(?:aws|aws-us-gov|aws-cn):iam::\\d{12}:role/[\\w+=,.@-]+$", "description": "(Required) The ARN of the role that allows Automation to perform the actions on your behalf.", "type": "String", }, "Finding": { "description": "The input from the Orchestrator Step function for the Lambda.1 finding", "type": "StringMap", }, "RemediationRoleName": { "allowedPattern": "^[\\w+=,.@-]+$", "default": "SO0111-RemoveLambdaPublicAccess", "type": "String", }, }, "schemaVersion": "0.3", }, "DocumentFormat": "YAML", "DocumentType": "Automation", "Name": "ASR-AFSBP_1.0.0_Lambda.1", "UpdateMethod": "NewVersion", }, "Type": "AWS::SSM::Document", }, "ControlAFSBPRDS1": { "Condition": "EnableRDS1Condition", "DependsOn": [ "CreateWait0", ], "Properties": { "Content": { "assumeRole": "{{ AutomationAssumeRole }}", "description": "### Document Name - ASR-AFSBP_1.0.0_RDS.1 ## What does this document do? This document changes public RDS snapshot to private ## Input Parameters * Finding: (Required) Security Hub finding details JSON * AutomationAssumeRole: (Required) The ARN of the role that allows Automation to perform the actions on your behalf. ## Documentation Links * [AFSBP RDS.1](https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-standards-fsbp-controls.html#fsbp-rds-1) ", "mainSteps": [ { "action": "aws:executeScript", "inputs": { "Handler": "parse_event", "InputPayload": { "Finding": "{{Finding}}", "expected_control_id": [ "RDS.1", ], "parse_id_pattern": "^arn:(?:aws|aws-cn|aws-us-gov):rds:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:(cluster-snapshot|snapshot):([a-zA-Z](?:[0-9a-zA-Z]+-)*[0-9a-zA-Z]+)$", "resource_index": 2, }, "Runtime": "python3.8", "Script": "# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import re import json import boto3 from botocore.config import Config def connect_to_config(boto_config): return boto3.client('config', config=boto_config) def connect_to_ssm(boto_config): return boto3.client('ssm', config=boto_config) def get_solution_id(): return 'SO0111' def get_solution_version(): ssm = connect_to_ssm( Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/unknown' ) ) solution_version = 'unknown' try: ssm_parm_value = ssm.get_parameter( Name=f'/Solutions/{get_solution_id()}/member-version' )['Parameter'].get('Value', 'unknown') solution_version = ssm_parm_value except Exception as e: print(e) print(f'ERROR getting solution version') return solution_version def get_shortname(long_name): short_name = { 'aws-foundational-security-best-practices': 'AFSBP', 'cis-aws-foundations-benchmark': 'CIS', 'pci-dss': 'PCI', 'security-control': 'SC' } return short_name.get(long_name, None) def get_config_rule(rule_name): boto_config = Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/{get_solution_version()}' ) config_rule = None try: configsvc = connect_to_config(boto_config) config_rule = configsvc.describe_config_rules( ConfigRuleNames=[ rule_name ] ).get('ConfigRules', [])[0] except Exception as e: print(e) exit(f'ERROR getting config rule {rule_name}') return config_rule class FindingEvent: """ Finding object returns the parse fields from an input finding json object """ def _get_resource_id(self, parse_id_pattern, resource_index): identifier_raw = self.finding_json['Resources'][0]['Id'] self.resource_id = identifier_raw self.resource_id_matches = [] if parse_id_pattern: identifier_match = re.match( parse_id_pattern, identifier_raw ) if identifier_match: for group in range(1, len(identifier_match.groups())+1): self.resource_id_matches.append(identifier_match.group(group)) self.resource_id = identifier_match.group(resource_index) else: exit(f'ERROR: Invalid resource Id {identifier_raw}') def _get_sc_check(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'security-control/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname('security-control') self.control_id = match_finding_id.group(1) return match_finding_id def _get_standard_info(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'subscription/(.*?)/v/(\\d+\\.\\d+\\.\\d+)/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname(match_finding_id.group(1)) self.standard_version = match_finding_id.group(2) self.control_id = match_finding_id.group(3) else: match_sc_finding_id = self._get_sc_check() if not match_sc_finding_id: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]}' def _get_aws_config_rule(self): # config_rule_id refers to the AWS Config Rule that produced the finding if "RelatedAWSResources:0/type" in self.finding_json['ProductFields'] and self.finding_json['ProductFields']['RelatedAWSResources:0/type'] == 'AWS::Config::ConfigRule': self.aws_config_rule_id = self.finding_json['ProductFields']['RelatedAWSResources:0/name'] self.aws_config_rule = get_config_rule(self.aws_config_rule_id) def _get_region_from_resource_id(self): check_for_region = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):[a-zA-Z0-9]+:([a-z]{2}(?:-gov)?-[a-z]+-\\d):.*:.*$', self.finding_json['Resources'][0]['Id'] ) if check_for_region: self.resource_region = check_for_region.group(1) def __init__(self, finding_json, parse_id_pattern, expected_control_id, resource_index): self.valid_finding = True self.resource_region = None self.control_id = None self.aws_config_rule_id = None self.aws_config_rule = {} """Populate fields""" # v1.5 self.finding_json = finding_json self._get_resource_id(parse_id_pattern, resource_index) # self.resource_id, self.resource_id_matches self._get_standard_info() # self.standard_id, self.standard_version, self.control_id # V1.4 self.account_id = self.finding_json.get('AwsAccountId', None) # deprecate - get Finding.AwsAccountId if not re.match(r'^\\d{12}$', self.account_id) and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'AwsAccountId is invalid: {self.account_id}' self.finding_id = self.finding_json.get('Id', None) # deprecate self.product_arn = self.finding_json.get('ProductArn', None) if not re.match(r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:[a-z]{2}(?:-gov)?-[a-z]+-\\d::product/aws/securityhub$', self.product_arn): if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'ProductArn is invalid: {self.product_arn}' self.details = self.finding_json['Resources'][0].get('Details', {}) # Test mode is used with fabricated finding data to tell the # remediation runbook to run in test more (where supported) # Currently not widely-used and perhaps should be deprecated. self.testmode = bool('testmode' in self.finding_json) self.resource = self.finding_json['Resources'][0] self._get_region_from_resource_id() self._get_aws_config_rule() self.affected_object = {'Type': self.resource['Type'], 'Id': self.resource_id, 'OutputKey': 'Remediation.Output'} # Validate control_id if not self.control_id: if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]} - missing Control Id' elif self.control_id not in expected_control_id: # ControlId is the expected value if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Control Id from input ({self.control_id}) does not match {str(expected_control_id)}' if not self.resource_id and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = 'Resource Id is missing from the finding json Resources (Id)' if not self.valid_finding: # Error message and return error data msg = f'ERROR: {self.invalid_finding_reason}' exit(msg) def __str__(self): return json.dumps(self.__dict__) ''' MAIN ''' def parse_event(event, _): finding_event = FindingEvent(event['Finding'], event['parse_id_pattern'], event['expected_control_id'], event.get('resource_index', 1)) if not finding_event.valid_finding: exit('ERROR: Finding is not valid') return { "account_id": finding_event.account_id, "resource_id": finding_event.resource_id, "finding_id": finding_event.finding_id, # Deprecate v1.5.0+ "control_id": finding_event.control_id, "product_arn": finding_event.product_arn, # Deprecate v1.5.0+ "object": finding_event.affected_object, "matches": finding_event.resource_id_matches, "details": finding_event.details, # Deprecate v1.5.0+ "testmode": finding_event.testmode, # Deprecate v1.5.0+ "resource": finding_event.resource, "resource_region": finding_event.resource_region, "finding": finding_event.finding_json, "aws_config_rule": finding_event.aws_config_rule }", }, "name": "ParseInput", "nextStep": "Remediation", "outputs": [ { "Name": "DBSnapshotId", "Selector": "$.Payload.resource_id", "Type": "String", }, { "Name": "DBSnapshotType", "Selector": "$.Payload.matches[0]", "Type": "String", }, { "Name": "FindingId", "Selector": "$.Payload.finding_id", "Type": "String", }, { "Name": "ProductArn", "Selector": "$.Payload.product_arn", "Type": "String", }, { "Name": "AffectedObject", "Selector": "$.Payload.object", "Type": "StringMap", }, { "Name": "Type", "Selector": "$.Payload.type", "Type": "String", }, { "Name": "RemediationRegion", "Selector": "$.Payload.resource_region", "Type": "String", }, { "Name": "RemediationAccount", "Selector": "$.Payload.account_id", "Type": "String", }, ], }, { "action": "aws:executeAutomation", "inputs": { "DocumentName": "ASR-MakeRDSSnapshotPrivate", "RuntimeParameters": { "AutomationAssumeRole": "arn:{{global:AWS_PARTITION}}:iam::{{global:ACCOUNT_ID}}:role/{{RemediationRoleName}}", "DBSnapshotId": "{{ParseInput.DBSnapshotId}}", "DBSnapshotType": "{{ParseInput.DBSnapshotType}}", }, "TargetLocations": [ { "Accounts": [ "{{ParseInput.RemediationAccount}}", ], "ExecutionRoleName": "{{RemediationRoleName}}", "Regions": [ "{{ParseInput.RemediationRegion}}", ], }, ], }, "name": "Remediation", "nextStep": "UpdateFinding", }, { "action": "aws:executeAwsApi", "description": "Update finding", "inputs": { "Api": "BatchUpdateFindings", "FindingIdentifiers": [ { "Id": "{{ParseInput.FindingId}}", "ProductArn": "{{ParseInput.ProductArn}}", }, ], "Note": { "Text": "RDS DB Snapshot modified to private", "UpdatedBy": "ASR-AFSBP_1.0.0_RDS.1", }, "Service": "securityhub", "Workflow": { "Status": "RESOLVED", }, }, "isEnd": true, "name": "UpdateFinding", }, ], "outputs": [ "Remediation.Output", "ParseInput.AffectedObject", ], "parameters": { "AutomationAssumeRole": { "allowedPattern": "^arn:(?:aws|aws-us-gov|aws-cn):iam::\\d{12}:role/[\\w+=,.@-]+$", "description": "(Required) The ARN of the role that allows Automation to perform the actions on your behalf.", "type": "String", }, "Finding": { "description": "The input from the Orchestrator Step function for the RDS.1 finding", "type": "StringMap", }, "RemediationRoleName": { "allowedPattern": "^[\\w+=,.@-]+$", "default": "SO0111-MakeRDSSnapshotPrivate", "type": "String", }, }, "schemaVersion": "0.3", }, "DocumentFormat": "YAML", "DocumentType": "Automation", "Name": "ASR-AFSBP_1.0.0_RDS.1", "UpdateMethod": "NewVersion", }, "Type": "AWS::SSM::Document", }, "CreateWait0": { "DeletionPolicy": "Delete", "Properties": { "CreateIntervalSeconds": 1, "DeleteIntervalSeconds": 0, "DocumentPropertiesHash": "897540159cc11ecdc5d175eee99cc91041790b69ca60e8f6f1ae5cd3b72084be", "ServiceToken": { "Ref": "WaitProviderServiceToken", }, "UpdateIntervalSeconds": 1, }, "Type": "Custom::Wait", "UpdateReplacePolicy": "Delete", }, "DeletWait0": { "DeletionPolicy": "Delete", "DependsOn": [ "Gate0", ], "Properties": { "CreateIntervalSeconds": 0, "DeleteIntervalSeconds": 0.5, "DocumentPropertiesHash": "897540159cc11ecdc5d175eee99cc91041790b69ca60e8f6f1ae5cd3b72084be", "ServiceToken": { "Ref": "WaitProviderServiceToken", }, "UpdateIntervalSeconds": 0, }, "Type": "Custom::Wait", "UpdateReplacePolicy": "Delete", }, "Gate0": { "Metadata": { "ControlAFSBPEC21Ready": { "Fn::If": [ "EnableEC21Condition", { "Ref": "ControlAFSBPEC21", }, "", ], }, "ControlAFSBPLambda1Ready": { "Fn::If": [ "EnableLambda1Condition", { "Ref": "ControlAFSBPLambda1", }, "", ], }, "ControlAFSBPRDS1Ready": { "Fn::If": [ "EnableRDS1Condition", { "Ref": "ControlAFSBPRDS1", }, "", ], }, }, "Type": "AWS::CloudFormation::WaitConditionHandle", }, }, } `; exports[`Primary Stack - AFSBP 1`] = ` { "Description": "test;", "Mappings": { "SourceCode": { "General": { "KeyPrefix": "aws-security-hub-automated-response-and-remediation/v1.1.1", "S3Bucket": "sharrbukkit", }, }, }, "Parameters": { "AFSBP100Example1AutoTrigger": { "AllowedValues": [ "ENABLED", "DISABLED", ], "Default": "DISABLED", "Description": "This will fully enable automated remediation for AFSBP 1.0.0 Example.1", "Type": "String", }, "AFSBP100Example3AutoTrigger": { "AllowedValues": [ "ENABLED", "DISABLED", ], "Default": "DISABLED", "Description": "This will fully enable automated remediation for AFSBP 1.0.0 Example.3", "Type": "String", }, "AFSBP100Example5AutoTrigger": { "AllowedValues": [ "ENABLED", "DISABLED", ], "Default": "DISABLED", "Description": "This will fully enable automated remediation for AFSBP 1.0.0 Example.5", "Type": "String", }, "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter": { "Default": "/Solutions/SO0111/OrchestratorArn", "Type": "AWS::SSM::Parameter::Value", }, }, "Resources": { "AFSBPExample1AutoEventRuleC2A31DE2": { "Properties": { "Description": "Remediate AFSBP 1.0.0 Example.1 automatic remediation trigger event rule.", "EventPattern": { "detail": { "findings": { "Compliance": { "Status": [ "FAILED", "WARNING", ], }, "GeneratorId": [ "aws-foundational-security-best-practices/v/1.0.0/Example.1", ], "RecordState": [ "ACTIVE", ], "Workflow": { "Status": [ "NEW", ], }, }, }, "detail-type": [ "Security Hub Findings - Imported", ], "source": [ "aws.securityhub", ], }, "Name": "AFSBP_1.0.0_Example.1_AutoTrigger", "State": { "Ref": "AFSBP100Example1AutoTrigger", }, "Targets": [ { "Arn": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, "Id": "Target0", "RoleArn": { "Fn::GetAtt": [ "AFSBPExample1EventsRuleRole2EAEBD38", "Arn", ], }, }, ], }, "Type": "AWS::Events::Rule", }, "AFSBPExample1EventsRuleRole2EAEBD38": { "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "events.amazonaws.com", }, }, ], "Version": "2012-10-17", }, }, "Type": "AWS::IAM::Role", }, "AFSBPExample1EventsRuleRoleDefaultPolicy7C237931": { "Properties": { "PolicyDocument": { "Statement": [ { "Action": "states:StartExecution", "Effect": "Allow", "Resource": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, }, ], "Version": "2012-10-17", }, "PolicyName": "AFSBPExample1EventsRuleRoleDefaultPolicy7C237931", "Roles": [ { "Ref": "AFSBPExample1EventsRuleRole2EAEBD38", }, ], }, "Type": "AWS::IAM::Policy", }, "AFSBPExample3AutoEventRule804387B8": { "Properties": { "Description": "Remediate AFSBP 1.0.0 Example.3 automatic remediation trigger event rule.", "EventPattern": { "detail": { "findings": { "Compliance": { "Status": [ "FAILED", "WARNING", ], }, "GeneratorId": [ "aws-foundational-security-best-practices/v/1.0.0/Example.3", ], "RecordState": [ "ACTIVE", ], "Workflow": { "Status": [ "NEW", ], }, }, }, "detail-type": [ "Security Hub Findings - Imported", ], "source": [ "aws.securityhub", ], }, "Name": "AFSBP_1.0.0_Example.3_AutoTrigger", "State": { "Ref": "AFSBP100Example3AutoTrigger", }, "Targets": [ { "Arn": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, "Id": "Target0", "RoleArn": { "Fn::GetAtt": [ "AFSBPExample3EventsRuleRole5956A03B", "Arn", ], }, }, ], }, "Type": "AWS::Events::Rule", }, "AFSBPExample3EventsRuleRole5956A03B": { "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "events.amazonaws.com", }, }, ], "Version": "2012-10-17", }, }, "Type": "AWS::IAM::Role", }, "AFSBPExample3EventsRuleRoleDefaultPolicy6964C066": { "Properties": { "PolicyDocument": { "Statement": [ { "Action": "states:StartExecution", "Effect": "Allow", "Resource": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, }, ], "Version": "2012-10-17", }, "PolicyName": "AFSBPExample3EventsRuleRoleDefaultPolicy6964C066", "Roles": [ { "Ref": "AFSBPExample3EventsRuleRole5956A03B", }, ], }, "Type": "AWS::IAM::Policy", }, "AFSBPExample5AutoEventRuleD0EDB507": { "Properties": { "Description": "Remediate AFSBP 1.0.0 Example.5 automatic remediation trigger event rule.", "EventPattern": { "detail": { "findings": { "Compliance": { "Status": [ "FAILED", "WARNING", ], }, "GeneratorId": [ "aws-foundational-security-best-practices/v/1.0.0/Example.5", ], "RecordState": [ "ACTIVE", ], "Workflow": { "Status": [ "NEW", ], }, }, }, "detail-type": [ "Security Hub Findings - Imported", ], "source": [ "aws.securityhub", ], }, "Name": "AFSBP_1.0.0_Example.5_AutoTrigger", "State": { "Ref": "AFSBP100Example5AutoTrigger", }, "Targets": [ { "Arn": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, "Id": "Target0", "RoleArn": { "Fn::GetAtt": [ "AFSBPExample5EventsRuleRole80D6194D", "Arn", ], }, }, ], }, "Type": "AWS::Events::Rule", }, "AFSBPExample5EventsRuleRole80D6194D": { "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "events.amazonaws.com", }, }, ], "Version": "2012-10-17", }, }, "Type": "AWS::IAM::Role", }, "AFSBPExample5EventsRuleRoleDefaultPolicy34C14018": { "Properties": { "PolicyDocument": { "Statement": [ { "Action": "states:StartExecution", "Effect": "Allow", "Resource": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, }, ], "Version": "2012-10-17", }, "PolicyName": "AFSBPExample5EventsRuleRoleDefaultPolicy34C14018", "Roles": [ { "Ref": "AFSBPExample5EventsRuleRole80D6194D", }, ], }, "Type": "AWS::IAM::Policy", }, "AFSBPShortNameF737916D": { "Properties": { "Description": "Provides a short (1-12) character abbreviation for the standard.", "Name": "/Solutions/SO0111/aws-foundational-security-best-practices/1.0.0/shortname", "Type": "String", "Value": "AFSBP", }, "Type": "AWS::SSM::Parameter", }, "StandardVersionCB2C6951": { "Properties": { "Description": "This parameter controls whether the SHARR step function will process findings for this version of the standard.", "Name": "/Solutions/SO0111/aws-foundational-security-best-practices/1.0.0/status", "Type": "String", "Value": "enabled", }, "Type": "AWS::SSM::Parameter", }, }, } `;