// Jest Snapshot v1, https://goo.gl/fbAQLP exports[`default stack 1`] = ` { "Description": "test;", "Mappings": { "SourceCode": { "General": { "KeyPrefix": "aws-security-hub-automated-response-and-remediation/v1.1.1", "S3Bucket": "sharrbukkit", }, }, }, "Parameters": { "PCI321PCIAutoScaling1AutoTrigger": { "AllowedValues": [ "ENABLED", "DISABLED", ], "Default": "DISABLED", "Description": "This will fully enable automated remediation for PCI 3.2.1 PCI.AutoScaling.1", "Type": "String", }, "PCI321PCIEC26AutoTrigger": { "AllowedValues": [ "ENABLED", "DISABLED", ], "Default": "DISABLED", "Description": "This will fully enable automated remediation for PCI 3.2.1 PCI.EC2.6", "Type": "String", }, "PCI321PCIIAM8AutoTrigger": { "AllowedValues": [ "ENABLED", "DISABLED", ], "Default": "DISABLED", "Description": "This will fully enable automated remediation for PCI 3.2.1 PCI.IAM.8", "Type": "String", }, "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter": { "Default": "/Solutions/SO0111/OrchestratorArn", "Type": "AWS::SSM::Parameter::Value", }, }, "Resources": { "PCIPCIAutoScaling1AutoEventRuleCDFEB9FF": { "Properties": { "Description": "Remediate PCI 3.2.1 PCI.AutoScaling.1 automatic remediation trigger event rule.", "EventPattern": { "detail": { "findings": { "Compliance": { "Status": [ "FAILED", "WARNING", ], }, "GeneratorId": [ "pci-dss/v/3.2.1/PCI.AutoScaling.1", ], "RecordState": [ "ACTIVE", ], "Workflow": { "Status": [ "NEW", ], }, }, }, "detail-type": [ "Security Hub Findings - Imported", ], "source": [ "aws.securityhub", ], }, "Name": "PCI_3.2.1_PCI.AutoScaling.1_AutoTrigger", "State": { "Ref": "PCI321PCIAutoScaling1AutoTrigger", }, "Targets": [ { "Arn": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, "Id": "Target0", "RoleArn": { "Fn::GetAtt": [ "PCIPCIAutoScaling1EventsRuleRole3283761D", "Arn", ], }, }, ], }, "Type": "AWS::Events::Rule", }, "PCIPCIAutoScaling1EventsRuleRole3283761D": { "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "events.amazonaws.com", }, }, ], "Version": "2012-10-17", }, }, "Type": "AWS::IAM::Role", }, "PCIPCIAutoScaling1EventsRuleRoleDefaultPolicy7F317AE9": { "Properties": { "PolicyDocument": { "Statement": [ { "Action": "states:StartExecution", "Effect": "Allow", "Resource": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, }, ], "Version": "2012-10-17", }, "PolicyName": "PCIPCIAutoScaling1EventsRuleRoleDefaultPolicy7F317AE9", "Roles": [ { "Ref": "PCIPCIAutoScaling1EventsRuleRole3283761D", }, ], }, "Type": "AWS::IAM::Policy", }, "PCIPCIEC26AutoEventRule084B7A4B": { "Properties": { "Description": "Remediate PCI 3.2.1 PCI.EC2.6 automatic remediation trigger event rule.", "EventPattern": { "detail": { "findings": { "Compliance": { "Status": [ "FAILED", "WARNING", ], }, "GeneratorId": [ "pci-dss/v/3.2.1/PCI.EC2.6", ], "RecordState": [ "ACTIVE", ], "Workflow": { "Status": [ "NEW", ], }, }, }, "detail-type": [ "Security Hub Findings - Imported", ], "source": [ "aws.securityhub", ], }, "Name": "PCI_3.2.1_PCI.EC2.6_AutoTrigger", "State": { "Ref": "PCI321PCIEC26AutoTrigger", }, "Targets": [ { "Arn": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, "Id": "Target0", "RoleArn": { "Fn::GetAtt": [ "PCIPCIEC26EventsRuleRole8A61F75E", "Arn", ], }, }, ], }, "Type": "AWS::Events::Rule", }, "PCIPCIEC26EventsRuleRole8A61F75E": { "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "events.amazonaws.com", }, }, ], "Version": "2012-10-17", }, }, "Type": "AWS::IAM::Role", }, "PCIPCIEC26EventsRuleRoleDefaultPolicy22C238AF": { "Properties": { "PolicyDocument": { "Statement": [ { "Action": "states:StartExecution", "Effect": "Allow", "Resource": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, }, ], "Version": "2012-10-17", }, "PolicyName": "PCIPCIEC26EventsRuleRoleDefaultPolicy22C238AF", "Roles": [ { "Ref": "PCIPCIEC26EventsRuleRole8A61F75E", }, ], }, "Type": "AWS::IAM::Policy", }, "PCIPCIIAM8AutoEventRuleD053739F": { "Properties": { "Description": "Remediate PCI 3.2.1 PCI.IAM.8 automatic remediation trigger event rule.", "EventPattern": { "detail": { "findings": { "Compliance": { "Status": [ "FAILED", "WARNING", ], }, "GeneratorId": [ "pci-dss/v/3.2.1/PCI.IAM.8", ], "RecordState": [ "ACTIVE", ], "Workflow": { "Status": [ "NEW", ], }, }, }, "detail-type": [ "Security Hub Findings - Imported", ], "source": [ "aws.securityhub", ], }, "Name": "PCI_3.2.1_PCI.IAM.8_AutoTrigger", "State": { "Ref": "PCI321PCIIAM8AutoTrigger", }, "Targets": [ { "Arn": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, "Id": "Target0", "RoleArn": { "Fn::GetAtt": [ "PCIPCIIAM8EventsRuleRoleE8D97921", "Arn", ], }, }, ], }, "Type": "AWS::Events::Rule", }, "PCIPCIIAM8EventsRuleRoleDefaultPolicy8C6970ED": { "Properties": { "PolicyDocument": { "Statement": [ { "Action": "states:StartExecution", "Effect": "Allow", "Resource": { "Ref": "SsmParameterValueSolutionsSO0111OrchestratorArnC96584B6F00A464EAD1953AFF4B05118Parameter", }, }, ], "Version": "2012-10-17", }, "PolicyName": "PCIPCIIAM8EventsRuleRoleDefaultPolicy8C6970ED", "Roles": [ { "Ref": "PCIPCIIAM8EventsRuleRoleE8D97921", }, ], }, "Type": "AWS::IAM::Policy", }, "PCIPCIIAM8EventsRuleRoleE8D97921": { "Properties": { "AssumeRolePolicyDocument": { "Statement": [ { "Action": "sts:AssumeRole", "Effect": "Allow", "Principal": { "Service": "events.amazonaws.com", }, }, ], "Version": "2012-10-17", }, }, "Type": "AWS::IAM::Role", }, "PCIShortName090DB238": { "Properties": { "Description": "Provides a short (1-12) character abbreviation for the standard.", "Name": "/Solutions/SO0111/pci-dss/3.2.1/shortname", "Type": "String", "Value": "PCI", }, "Type": "AWS::SSM::Parameter", }, "StandardVersionCB2C6951": { "Properties": { "Description": "This parameter controls whether the SHARR step function will process findings for this version of the standard.", "Name": "/Solutions/SO0111/pci-dss/3.2.1/status", "Type": "String", "Value": "enabled", }, "Type": "AWS::SSM::Parameter", }, }, } `; exports[`default stack 2`] = ` { "Conditions": { "EnablePCIAutoScaling1Condition": { "Fn::Equals": [ { "Ref": "EnablePCIAutoScaling1", }, "Available", ], }, "EnablePCIEC26Condition": { "Fn::Equals": [ { "Ref": "EnablePCIEC26", }, "Available", ], }, "EnablePCIIAM8Condition": { "Fn::Equals": [ { "Ref": "EnablePCIIAM8", }, "Available", ], }, }, "Description": "test;", "Parameters": { "EnablePCIAutoScaling1": { "AllowedValues": [ "Available", "NOT Available", ], "Default": "Available", "Description": "Enable/disable availability of remediation for PCI version 3.2.1 Control PCI.AutoScaling.1 in Security Hub Console Custom Actions. If NOT Available the remediation cannot be triggered from the Security Hub console in the Security Hub Admin account.", "Type": "String", }, "EnablePCIEC26": { "AllowedValues": [ "Available", "NOT Available", ], "Default": "Available", "Description": "Enable/disable availability of remediation for PCI version 3.2.1 Control PCI.EC2.6 in Security Hub Console Custom Actions. If NOT Available the remediation cannot be triggered from the Security Hub console in the Security Hub Admin account.", "Type": "String", }, "EnablePCIIAM8": { "AllowedValues": [ "Available", "NOT Available", ], "Default": "Available", "Description": "Enable/disable availability of remediation for PCI version 3.2.1 Control PCI.IAM.8 in Security Hub Console Custom Actions. If NOT Available the remediation cannot be triggered from the Security Hub console in the Security Hub Admin account.", "Type": "String", }, "SecHubAdminAccount": { "AllowedPattern": "^\\d{12}$", "Description": "Admin account number", "Type": "String", }, "WaitProviderServiceToken": { "Type": "String", }, }, "Resources": { "ControlPCIPCIAutoScaling1": { "Condition": "EnablePCIAutoScaling1Condition", "DependsOn": [ "CreateWait0", ], "Properties": { "Content": { "assumeRole": "{{ AutomationAssumeRole }}", "description": "### Document Name - ASR-PCI_3.2.1_PCI.AutoScaling.1 ## What does this document do? This document enables ELB healthcheck on a given AutoScaling Group using the [UpdateAutoScalingGroup] API. ## Input Parameters * Finding: (Required) Security Hub finding details JSON * HealthCheckGracePeriod: (Optional) Health check grace period when ELB health check is Enabled Default: 30 seconds * AutomationAssumeRole: (Required) The ARN of the role that allows Automation to perform the actions on your behalf. ## Output Parameters * Remediation.Output ## Documentation Links * [PCI AutoScaling.1](https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-pci-controls.html#pcidss-autoscaling-1) ", "mainSteps": [ { "action": "aws:executeScript", "inputs": { "Handler": "parse_event", "InputPayload": { "Finding": "{{Finding}}", "expected_control_id": [ "PCI.AutoScaling.1", ], "parse_id_pattern": "^arn:(?:aws|aws-cn|aws-us-gov):autoscaling:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:autoScalingGroup:(?:[0-9a-fA-F]{8}-(?:[0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}):autoScalingGroupName/(.{1,255})$", }, "Runtime": "python3.8", "Script": "# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import re import json import boto3 from botocore.config import Config def connect_to_config(boto_config): return boto3.client('config', config=boto_config) def connect_to_ssm(boto_config): return boto3.client('ssm', config=boto_config) def get_solution_id(): return 'SO0111' def get_solution_version(): ssm = connect_to_ssm( Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/unknown' ) ) solution_version = 'unknown' try: ssm_parm_value = ssm.get_parameter( Name=f'/Solutions/{get_solution_id()}/member-version' )['Parameter'].get('Value', 'unknown') solution_version = ssm_parm_value except Exception as e: print(e) print(f'ERROR getting solution version') return solution_version def get_shortname(long_name): short_name = { 'aws-foundational-security-best-practices': 'AFSBP', 'cis-aws-foundations-benchmark': 'CIS', 'pci-dss': 'PCI', 'security-control': 'SC' } return short_name.get(long_name, None) def get_config_rule(rule_name): boto_config = Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/{get_solution_version()}' ) config_rule = None try: configsvc = connect_to_config(boto_config) config_rule = configsvc.describe_config_rules( ConfigRuleNames=[ rule_name ] ).get('ConfigRules', [])[0] except Exception as e: print(e) exit(f'ERROR getting config rule {rule_name}') return config_rule class FindingEvent: """ Finding object returns the parse fields from an input finding json object """ def _get_resource_id(self, parse_id_pattern, resource_index): identifier_raw = self.finding_json['Resources'][0]['Id'] self.resource_id = identifier_raw self.resource_id_matches = [] if parse_id_pattern: identifier_match = re.match( parse_id_pattern, identifier_raw ) if identifier_match: for group in range(1, len(identifier_match.groups())+1): self.resource_id_matches.append(identifier_match.group(group)) self.resource_id = identifier_match.group(resource_index) else: exit(f'ERROR: Invalid resource Id {identifier_raw}') def _get_sc_check(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'security-control/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname('security-control') self.control_id = match_finding_id.group(1) return match_finding_id def _get_standard_info(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'subscription/(.*?)/v/(\\d+\\.\\d+\\.\\d+)/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname(match_finding_id.group(1)) self.standard_version = match_finding_id.group(2) self.control_id = match_finding_id.group(3) else: match_sc_finding_id = self._get_sc_check() if not match_sc_finding_id: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]}' def _get_aws_config_rule(self): # config_rule_id refers to the AWS Config Rule that produced the finding if "RelatedAWSResources:0/type" in self.finding_json['ProductFields'] and self.finding_json['ProductFields']['RelatedAWSResources:0/type'] == 'AWS::Config::ConfigRule': self.aws_config_rule_id = self.finding_json['ProductFields']['RelatedAWSResources:0/name'] self.aws_config_rule = get_config_rule(self.aws_config_rule_id) def _get_region_from_resource_id(self): check_for_region = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):[a-zA-Z0-9]+:([a-z]{2}(?:-gov)?-[a-z]+-\\d):.*:.*$', self.finding_json['Resources'][0]['Id'] ) if check_for_region: self.resource_region = check_for_region.group(1) def __init__(self, finding_json, parse_id_pattern, expected_control_id, resource_index): self.valid_finding = True self.resource_region = None self.control_id = None self.aws_config_rule_id = None self.aws_config_rule = {} """Populate fields""" # v1.5 self.finding_json = finding_json self._get_resource_id(parse_id_pattern, resource_index) # self.resource_id, self.resource_id_matches self._get_standard_info() # self.standard_id, self.standard_version, self.control_id # V1.4 self.account_id = self.finding_json.get('AwsAccountId', None) # deprecate - get Finding.AwsAccountId if not re.match(r'^\\d{12}$', self.account_id) and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'AwsAccountId is invalid: {self.account_id}' self.finding_id = self.finding_json.get('Id', None) # deprecate self.product_arn = self.finding_json.get('ProductArn', None) if not re.match(r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:[a-z]{2}(?:-gov)?-[a-z]+-\\d::product/aws/securityhub$', self.product_arn): if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'ProductArn is invalid: {self.product_arn}' self.details = self.finding_json['Resources'][0].get('Details', {}) # Test mode is used with fabricated finding data to tell the # remediation runbook to run in test more (where supported) # Currently not widely-used and perhaps should be deprecated. self.testmode = bool('testmode' in self.finding_json) self.resource = self.finding_json['Resources'][0] self._get_region_from_resource_id() self._get_aws_config_rule() self.affected_object = {'Type': self.resource['Type'], 'Id': self.resource_id, 'OutputKey': 'Remediation.Output'} # Validate control_id if not self.control_id: if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]} - missing Control Id' elif self.control_id not in expected_control_id: # ControlId is the expected value if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Control Id from input ({self.control_id}) does not match {str(expected_control_id)}' if not self.resource_id and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = 'Resource Id is missing from the finding json Resources (Id)' if not self.valid_finding: # Error message and return error data msg = f'ERROR: {self.invalid_finding_reason}' exit(msg) def __str__(self): return json.dumps(self.__dict__) ''' MAIN ''' def parse_event(event, _): finding_event = FindingEvent(event['Finding'], event['parse_id_pattern'], event['expected_control_id'], event.get('resource_index', 1)) if not finding_event.valid_finding: exit('ERROR: Finding is not valid') return { "account_id": finding_event.account_id, "resource_id": finding_event.resource_id, "finding_id": finding_event.finding_id, # Deprecate v1.5.0+ "control_id": finding_event.control_id, "product_arn": finding_event.product_arn, # Deprecate v1.5.0+ "object": finding_event.affected_object, "matches": finding_event.resource_id_matches, "details": finding_event.details, # Deprecate v1.5.0+ "testmode": finding_event.testmode, # Deprecate v1.5.0+ "resource": finding_event.resource, "resource_region": finding_event.resource_region, "finding": finding_event.finding_json, "aws_config_rule": finding_event.aws_config_rule }", }, "isEnd": false, "name": "ParseInput", "outputs": [ { "Name": "AutoScalingGroupName", "Selector": "$.Payload.resource_id", "Type": "String", }, { "Name": "FindingId", "Selector": "$.Payload.finding_id", "Type": "String", }, { "Name": "ProductArn", "Selector": "$.Payload.product_arn", "Type": "String", }, { "Name": "AffectedObject", "Selector": "$.Payload.object", "Type": "StringMap", }, { "Name": "RemediationRegion", "Selector": "$.Payload.resource_region", "Type": "String", }, { "Name": "RemediationAccount", "Selector": "$.Payload.account_id", "Type": "String", }, ], }, { "action": "aws:executeAutomation", "inputs": { "DocumentName": "ASR-EnableAutoScalingGroupELBHealthCheck", "RuntimeParameters": { "AutoScalingGroupName": "{{ParseInput.AutoScalingGroupName}}", "AutomationAssumeRole": "arn:{{global:AWS_PARTITION}}:iam::{{global:ACCOUNT_ID}}:role/SO0111-EnableAutoScalingGroupELBHealthCheck", }, "TargetLocations": [ { "Accounts": [ "{{ParseInput.RemediationAccount}}", ], "ExecutionRoleName": "SO0111-EnableAutoScalingGroupELBHealthCheck", "Regions": [ "{{ParseInput.RemediationRegion}}", ], }, ], }, "isEnd": false, "name": "Remediation", }, { "action": "aws:executeAwsApi", "description": "Update finding", "inputs": { "Api": "BatchUpdateFindings", "FindingIdentifiers": [ { "Id": "{{ParseInput.FindingId}}", "ProductArn": "{{ParseInput.ProductArn}}", }, ], "Note": { "Text": "ASG health check type updated to ELB", "UpdatedBy": "ASR-PCI_3.2.1_AutoScaling.1", }, "Service": "securityhub", "Workflow": { "Status": "RESOLVED", }, }, "isEnd": true, "name": "UpdateFinding", }, ], "outputs": [ "Remediation.Output", "ParseInput.AffectedObject", ], "parameters": { "AutomationAssumeRole": { "allowedPattern": "^arn:(?:aws|aws-us-gov|aws-cn):iam::\\d{12}:role/[\\w+=,.@-]+$", "description": "(Required) The ARN of the role that allows Automation to perform the actions on your behalf.", "type": "String", }, "Finding": { "description": "The input from the Orchestrator Step function for the PCI.AutoScaling.1 finding", "type": "StringMap", }, "HealthCheckGracePeriod": { "default": 30, "description": "ELB Health Check Grace Period", "type": "Integer", }, }, "schemaVersion": "0.3", }, "DocumentFormat": "YAML", "DocumentType": "Automation", "Name": "ASR-PCI_3.2.1_PCI.AutoScaling.1", "UpdateMethod": "NewVersion", }, "Type": "AWS::SSM::Document", }, "ControlPCIPCIEC26": { "Condition": "EnablePCIEC26Condition", "DependsOn": [ "CreateWait0", ], "Properties": { "Content": { "assumeRole": "{{ AutomationAssumeRole }}", "description": "### Document Name - ASR-PCI_3.2.1_PCI.EC2.6 ## What does this document do? Enables VPC Flow Logs for a VPC ## Input Parameters * Finding: (Required) Security Hub finding details JSON * AutomationAssumeRole: (Required) The ARN of the role that allows Automation to perform the actions on your behalf. ## Output Parameters * Remediation.Output - Remediation results ## Documentation Links * [PCI EC2.6](https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-pci-controls.html#pcidss-ec2-6) ", "mainSteps": [ { "action": "aws:executeScript", "inputs": { "Handler": "parse_event", "InputPayload": { "Finding": "{{Finding}}", "expected_control_id": [ "PCI.EC2.6", ], "parse_id_pattern": "^arn:(?:aws|aws-cn|aws-us-gov):ec2:.*:\\d{12}:vpc/(vpc-[0-9a-f]{8,17}$)", }, "Runtime": "python3.8", "Script": "# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import re import json import boto3 from botocore.config import Config def connect_to_config(boto_config): return boto3.client('config', config=boto_config) def connect_to_ssm(boto_config): return boto3.client('ssm', config=boto_config) def get_solution_id(): return 'SO0111' def get_solution_version(): ssm = connect_to_ssm( Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/unknown' ) ) solution_version = 'unknown' try: ssm_parm_value = ssm.get_parameter( Name=f'/Solutions/{get_solution_id()}/member-version' )['Parameter'].get('Value', 'unknown') solution_version = ssm_parm_value except Exception as e: print(e) print(f'ERROR getting solution version') return solution_version def get_shortname(long_name): short_name = { 'aws-foundational-security-best-practices': 'AFSBP', 'cis-aws-foundations-benchmark': 'CIS', 'pci-dss': 'PCI', 'security-control': 'SC' } return short_name.get(long_name, None) def get_config_rule(rule_name): boto_config = Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/{get_solution_version()}' ) config_rule = None try: configsvc = connect_to_config(boto_config) config_rule = configsvc.describe_config_rules( ConfigRuleNames=[ rule_name ] ).get('ConfigRules', [])[0] except Exception as e: print(e) exit(f'ERROR getting config rule {rule_name}') return config_rule class FindingEvent: """ Finding object returns the parse fields from an input finding json object """ def _get_resource_id(self, parse_id_pattern, resource_index): identifier_raw = self.finding_json['Resources'][0]['Id'] self.resource_id = identifier_raw self.resource_id_matches = [] if parse_id_pattern: identifier_match = re.match( parse_id_pattern, identifier_raw ) if identifier_match: for group in range(1, len(identifier_match.groups())+1): self.resource_id_matches.append(identifier_match.group(group)) self.resource_id = identifier_match.group(resource_index) else: exit(f'ERROR: Invalid resource Id {identifier_raw}') def _get_sc_check(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'security-control/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname('security-control') self.control_id = match_finding_id.group(1) return match_finding_id def _get_standard_info(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'subscription/(.*?)/v/(\\d+\\.\\d+\\.\\d+)/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname(match_finding_id.group(1)) self.standard_version = match_finding_id.group(2) self.control_id = match_finding_id.group(3) else: match_sc_finding_id = self._get_sc_check() if not match_sc_finding_id: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]}' def _get_aws_config_rule(self): # config_rule_id refers to the AWS Config Rule that produced the finding if "RelatedAWSResources:0/type" in self.finding_json['ProductFields'] and self.finding_json['ProductFields']['RelatedAWSResources:0/type'] == 'AWS::Config::ConfigRule': self.aws_config_rule_id = self.finding_json['ProductFields']['RelatedAWSResources:0/name'] self.aws_config_rule = get_config_rule(self.aws_config_rule_id) def _get_region_from_resource_id(self): check_for_region = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):[a-zA-Z0-9]+:([a-z]{2}(?:-gov)?-[a-z]+-\\d):.*:.*$', self.finding_json['Resources'][0]['Id'] ) if check_for_region: self.resource_region = check_for_region.group(1) def __init__(self, finding_json, parse_id_pattern, expected_control_id, resource_index): self.valid_finding = True self.resource_region = None self.control_id = None self.aws_config_rule_id = None self.aws_config_rule = {} """Populate fields""" # v1.5 self.finding_json = finding_json self._get_resource_id(parse_id_pattern, resource_index) # self.resource_id, self.resource_id_matches self._get_standard_info() # self.standard_id, self.standard_version, self.control_id # V1.4 self.account_id = self.finding_json.get('AwsAccountId', None) # deprecate - get Finding.AwsAccountId if not re.match(r'^\\d{12}$', self.account_id) and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'AwsAccountId is invalid: {self.account_id}' self.finding_id = self.finding_json.get('Id', None) # deprecate self.product_arn = self.finding_json.get('ProductArn', None) if not re.match(r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:[a-z]{2}(?:-gov)?-[a-z]+-\\d::product/aws/securityhub$', self.product_arn): if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'ProductArn is invalid: {self.product_arn}' self.details = self.finding_json['Resources'][0].get('Details', {}) # Test mode is used with fabricated finding data to tell the # remediation runbook to run in test more (where supported) # Currently not widely-used and perhaps should be deprecated. self.testmode = bool('testmode' in self.finding_json) self.resource = self.finding_json['Resources'][0] self._get_region_from_resource_id() self._get_aws_config_rule() self.affected_object = {'Type': self.resource['Type'], 'Id': self.resource_id, 'OutputKey': 'Remediation.Output'} # Validate control_id if not self.control_id: if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]} - missing Control Id' elif self.control_id not in expected_control_id: # ControlId is the expected value if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Control Id from input ({self.control_id}) does not match {str(expected_control_id)}' if not self.resource_id and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = 'Resource Id is missing from the finding json Resources (Id)' if not self.valid_finding: # Error message and return error data msg = f'ERROR: {self.invalid_finding_reason}' exit(msg) def __str__(self): return json.dumps(self.__dict__) ''' MAIN ''' def parse_event(event, _): finding_event = FindingEvent(event['Finding'], event['parse_id_pattern'], event['expected_control_id'], event.get('resource_index', 1)) if not finding_event.valid_finding: exit('ERROR: Finding is not valid') return { "account_id": finding_event.account_id, "resource_id": finding_event.resource_id, "finding_id": finding_event.finding_id, # Deprecate v1.5.0+ "control_id": finding_event.control_id, "product_arn": finding_event.product_arn, # Deprecate v1.5.0+ "object": finding_event.affected_object, "matches": finding_event.resource_id_matches, "details": finding_event.details, # Deprecate v1.5.0+ "testmode": finding_event.testmode, # Deprecate v1.5.0+ "resource": finding_event.resource, "resource_region": finding_event.resource_region, "finding": finding_event.finding_json, "aws_config_rule": finding_event.aws_config_rule }", }, "name": "ParseInput", "outputs": [ { "Name": "VPC", "Selector": "$.Payload.resource_id", "Type": "String", }, { "Name": "FindingId", "Selector": "$.Payload.finding_id", "Type": "String", }, { "Name": "ProductArn", "Selector": "$.Payload.product_arn", "Type": "String", }, { "Name": "AffectedObject", "Selector": "$.Payload.object", "Type": "StringMap", }, { "Name": "RemediationRegion", "Selector": "$.Payload.resource_region", "Type": "String", }, { "Name": "RemediationAccount", "Selector": "$.Payload.account_id", "Type": "String", }, ], }, { "action": "aws:executeAutomation", "inputs": { "DocumentName": "ASR-EnableVPCFlowLogs", "RuntimeParameters": { "AutomationAssumeRole": "arn:{{global:AWS_PARTITION}}:iam::{{global:ACCOUNT_ID}}:role/{{RemediationRoleName}}", "RemediationRole": "arn:{{global:AWS_PARTITION}}:iam::{{global:ACCOUNT_ID}}:role/SO0111-EnableVPCFlowLogs-remediationRole", "VPC": "{{ParseInput.VPC}}", }, "TargetLocations": [ { "Accounts": [ "{{ParseInput.RemediationAccount}}", ], "ExecutionRoleName": "{{RemediationRoleName}}", "Regions": [ "{{ParseInput.RemediationRegion}}", ], }, ], }, "isEnd": false, "name": "Remediation", }, { "action": "aws:executeAwsApi", "description": "Update finding", "inputs": { "Api": "BatchUpdateFindings", "FindingIdentifiers": [ { "Id": "{{ParseInput.FindingId}}", "ProductArn": "{{ParseInput.ProductArn}}", }, ], "Note": { "Text": "Enabled VPC Flow Logs for {{ParseInput.VPC}}", "UpdatedBy": "ASR-PCI_3.2.1_PCI.EC2.6", }, "Service": "securityhub", "Workflow": { "Status": "RESOLVED", }, }, "isEnd": true, "name": "UpdateFinding", }, ], "outputs": [ "ParseInput.AffectedObject", "Remediation.Output", ], "parameters": { "AutomationAssumeRole": { "allowedPattern": "^arn:(?:aws|aws-us-gov|aws-cn):iam::\\d{12}:role/[\\w+=,.@-]+$", "description": "(Required) The ARN of the role that allows Automation to perform the actions on your behalf.", "type": "String", }, "Finding": { "description": "The input from the Orchestrator Step function for the PCI.EC2.6 finding", "type": "StringMap", }, "RemediationRoleName": { "allowedPattern": "^[\\w+=,.@-]+$", "default": "SO0111-EnableVPCFlowLogs", "type": "String", }, }, "schemaVersion": "0.3", }, "DocumentFormat": "YAML", "DocumentType": "Automation", "Name": "ASR-PCI_3.2.1_PCI.EC2.6", "UpdateMethod": "NewVersion", }, "Type": "AWS::SSM::Document", }, "ControlPCIPCIIAM8": { "Condition": "EnablePCIIAM8Condition", "DependsOn": [ "CreateWait0", ], "Properties": { "Content": { "assumeRole": "{{ AutomationAssumeRole }}", "description": "### Document Name - ASR-PCI_3.2.1_PCI.IAM.8 ## What does this document do? This document establishes a default password policy. ## Security Standards and Controls * CIS 1.5 - 1.11 * AFSBP IAM.7 * PCI IAM.8 ## Input Parameters * Finding: (Required) Security Hub finding details JSON * AutomationAssumeRole: (Required) The ARN of the role that allows Automation to perform the actions on your behalf. ## Output Parameters * Remediation.Output ## Documentation Links * [PCI IAM.8](https://docs.aws.amazon.com/securityhub/latest/userguide/securityhub-pci-controls.html#pcidss-iam-8) ", "mainSteps": [ { "action": "aws:executeScript", "inputs": { "Handler": "parse_event", "InputPayload": { "Finding": "{{Finding}}", "expected_control_id": [ "PCI.IAM.8", ], "parse_id_pattern": "", }, "Runtime": "python3.8", "Script": "# Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. # SPDX-License-Identifier: Apache-2.0 import re import json import boto3 from botocore.config import Config def connect_to_config(boto_config): return boto3.client('config', config=boto_config) def connect_to_ssm(boto_config): return boto3.client('ssm', config=boto_config) def get_solution_id(): return 'SO0111' def get_solution_version(): ssm = connect_to_ssm( Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/unknown' ) ) solution_version = 'unknown' try: ssm_parm_value = ssm.get_parameter( Name=f'/Solutions/{get_solution_id()}/member-version' )['Parameter'].get('Value', 'unknown') solution_version = ssm_parm_value except Exception as e: print(e) print(f'ERROR getting solution version') return solution_version def get_shortname(long_name): short_name = { 'aws-foundational-security-best-practices': 'AFSBP', 'cis-aws-foundations-benchmark': 'CIS', 'pci-dss': 'PCI', 'security-control': 'SC' } return short_name.get(long_name, None) def get_config_rule(rule_name): boto_config = Config( retries = { 'mode': 'standard' }, user_agent_extra = f'AwsSolution/{get_solution_id()}/{get_solution_version()}' ) config_rule = None try: configsvc = connect_to_config(boto_config) config_rule = configsvc.describe_config_rules( ConfigRuleNames=[ rule_name ] ).get('ConfigRules', [])[0] except Exception as e: print(e) exit(f'ERROR getting config rule {rule_name}') return config_rule class FindingEvent: """ Finding object returns the parse fields from an input finding json object """ def _get_resource_id(self, parse_id_pattern, resource_index): identifier_raw = self.finding_json['Resources'][0]['Id'] self.resource_id = identifier_raw self.resource_id_matches = [] if parse_id_pattern: identifier_match = re.match( parse_id_pattern, identifier_raw ) if identifier_match: for group in range(1, len(identifier_match.groups())+1): self.resource_id_matches.append(identifier_match.group(group)) self.resource_id = identifier_match.group(resource_index) else: exit(f'ERROR: Invalid resource Id {identifier_raw}') def _get_sc_check(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'security-control/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname('security-control') self.control_id = match_finding_id.group(1) return match_finding_id def _get_standard_info(self): match_finding_id = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:(?:[a-z]{2}(?:-gov)?-[a-z]+-\\d):\\d{12}:'+ 'subscription/(.*?)/v/(\\d+\\.\\d+\\.\\d+)/(.*)/finding/(?i:[0-9a-f]{8}-(?:[0-9a-f]{4}-){3}[0-9a-f]{12})$', self.finding_json['Id'] ) if match_finding_id: self.standard_id = get_shortname(match_finding_id.group(1)) self.standard_version = match_finding_id.group(2) self.control_id = match_finding_id.group(3) else: match_sc_finding_id = self._get_sc_check() if not match_sc_finding_id: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]}' def _get_aws_config_rule(self): # config_rule_id refers to the AWS Config Rule that produced the finding if "RelatedAWSResources:0/type" in self.finding_json['ProductFields'] and self.finding_json['ProductFields']['RelatedAWSResources:0/type'] == 'AWS::Config::ConfigRule': self.aws_config_rule_id = self.finding_json['ProductFields']['RelatedAWSResources:0/name'] self.aws_config_rule = get_config_rule(self.aws_config_rule_id) def _get_region_from_resource_id(self): check_for_region = re.match( r'^arn:(?:aws|aws-cn|aws-us-gov):[a-zA-Z0-9]+:([a-z]{2}(?:-gov)?-[a-z]+-\\d):.*:.*$', self.finding_json['Resources'][0]['Id'] ) if check_for_region: self.resource_region = check_for_region.group(1) def __init__(self, finding_json, parse_id_pattern, expected_control_id, resource_index): self.valid_finding = True self.resource_region = None self.control_id = None self.aws_config_rule_id = None self.aws_config_rule = {} """Populate fields""" # v1.5 self.finding_json = finding_json self._get_resource_id(parse_id_pattern, resource_index) # self.resource_id, self.resource_id_matches self._get_standard_info() # self.standard_id, self.standard_version, self.control_id # V1.4 self.account_id = self.finding_json.get('AwsAccountId', None) # deprecate - get Finding.AwsAccountId if not re.match(r'^\\d{12}$', self.account_id) and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'AwsAccountId is invalid: {self.account_id}' self.finding_id = self.finding_json.get('Id', None) # deprecate self.product_arn = self.finding_json.get('ProductArn', None) if not re.match(r'^arn:(?:aws|aws-cn|aws-us-gov):securityhub:[a-z]{2}(?:-gov)?-[a-z]+-\\d::product/aws/securityhub$', self.product_arn): if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'ProductArn is invalid: {self.product_arn}' self.details = self.finding_json['Resources'][0].get('Details', {}) # Test mode is used with fabricated finding data to tell the # remediation runbook to run in test more (where supported) # Currently not widely-used and perhaps should be deprecated. self.testmode = bool('testmode' in self.finding_json) self.resource = self.finding_json['Resources'][0] self._get_region_from_resource_id() self._get_aws_config_rule() self.affected_object = {'Type': self.resource['Type'], 'Id': self.resource_id, 'OutputKey': 'Remediation.Output'} # Validate control_id if not self.control_id: if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Finding Id is invalid: {self.finding_json["Id"]} - missing Control Id' elif self.control_id not in expected_control_id: # ControlId is the expected value if self.valid_finding: self.valid_finding = False self.invalid_finding_reason = f'Control Id from input ({self.control_id}) does not match {str(expected_control_id)}' if not self.resource_id and self.valid_finding: self.valid_finding = False self.invalid_finding_reason = 'Resource Id is missing from the finding json Resources (Id)' if not self.valid_finding: # Error message and return error data msg = f'ERROR: {self.invalid_finding_reason}' exit(msg) def __str__(self): return json.dumps(self.__dict__) ''' MAIN ''' def parse_event(event, _): finding_event = FindingEvent(event['Finding'], event['parse_id_pattern'], event['expected_control_id'], event.get('resource_index', 1)) if not finding_event.valid_finding: exit('ERROR: Finding is not valid') return { "account_id": finding_event.account_id, "resource_id": finding_event.resource_id, "finding_id": finding_event.finding_id, # Deprecate v1.5.0+ "control_id": finding_event.control_id, "product_arn": finding_event.product_arn, # Deprecate v1.5.0+ "object": finding_event.affected_object, "matches": finding_event.resource_id_matches, "details": finding_event.details, # Deprecate v1.5.0+ "testmode": finding_event.testmode, # Deprecate v1.5.0+ "resource": finding_event.resource, "resource_region": finding_event.resource_region, "finding": finding_event.finding_json, "aws_config_rule": finding_event.aws_config_rule }", }, "isEnd": false, "name": "ParseInput", "outputs": [ { "Name": "FindingId", "Selector": "$.Payload.finding_id", "Type": "String", }, { "Name": "ProductArn", "Selector": "$.Payload.product_arn", "Type": "String", }, { "Name": "AffectedObject", "Selector": "$.Payload.object", "Type": "StringMap", }, ], }, { "action": "aws:executeAutomation", "inputs": { "DocumentName": "ASR-SetIAMPasswordPolicy", "RuntimeParameters": { "AllowUsersToChangePassword": true, "AutomationAssumeRole": "arn:{{global:AWS_PARTITION}}:iam::{{global:ACCOUNT_ID}}:role/SO0111-SetIAMPasswordPolicy", "HardExpiry": true, "MaxPasswordAge": 90, "MinimumPasswordLength": 14, "PasswordReusePrevention": 24, "RequireLowercaseCharacters": true, "RequireNumbers": true, "RequireSymbols": true, "RequireUppercaseCharacters": true, }, }, "isEnd": false, "name": "Remediation", }, { "action": "aws:executeAwsApi", "description": "Update finding", "inputs": { "Api": "BatchUpdateFindings", "FindingIdentifiers": [ { "Id": "{{ParseInput.FindingId}}", "ProductArn": "{{ParseInput.ProductArn}}", }, ], "Note": { "Text": "Established a baseline password policy using the AWSConfigRemediation-SetIAMPasswordPolicy runbook.", "UpdatedBy": "ASR-PCI_3.2.1_IAM.8", }, "Service": "securityhub", "Workflow": { "Status": "RESOLVED", }, }, "isEnd": true, "name": "UpdateFinding", }, ], "outputs": [ "ParseInput.AffectedObject", "Remediation.Output", ], "parameters": { "AutomationAssumeRole": { "allowedPattern": "^arn:(?:aws|aws-us-gov|aws-cn):iam::\\d{12}:role/[\\w+=,.@-]+$", "description": "(Required) The ARN of the role that allows Automation to perform the actions on your behalf.", "type": "String", }, "Finding": { "description": "The input from the Orchestrator Step function for the PCI.IAM.8 finding", "type": "StringMap", }, }, "schemaVersion": "0.3", }, "DocumentFormat": "YAML", "DocumentType": "Automation", "Name": "ASR-PCI_3.2.1_PCI.IAM.8", "UpdateMethod": "NewVersion", }, "Type": "AWS::SSM::Document", }, "CreateWait0": { "DeletionPolicy": "Delete", "Properties": { "CreateIntervalSeconds": 1, "DeleteIntervalSeconds": 0, "DocumentPropertiesHash": "aa2cfc511d1d551f8f9af7857ff3116f745b335bbb687d28fa4001cc503aca03", "ServiceToken": { "Ref": "WaitProviderServiceToken", }, "UpdateIntervalSeconds": 1, }, "Type": "Custom::Wait", "UpdateReplacePolicy": "Delete", }, "DeletWait0": { "DeletionPolicy": "Delete", "DependsOn": [ "Gate0", ], "Properties": { "CreateIntervalSeconds": 0, "DeleteIntervalSeconds": 0.5, "DocumentPropertiesHash": "aa2cfc511d1d551f8f9af7857ff3116f745b335bbb687d28fa4001cc503aca03", "ServiceToken": { "Ref": "WaitProviderServiceToken", }, "UpdateIntervalSeconds": 0, }, "Type": "Custom::Wait", "UpdateReplacePolicy": "Delete", }, "Gate0": { "Metadata": { "ControlPCIPCIAutoScaling1Ready": { "Fn::If": [ "EnablePCIAutoScaling1Condition", { "Ref": "ControlPCIPCIAutoScaling1", }, "", ], }, "ControlPCIPCIEC26Ready": { "Fn::If": [ "EnablePCIEC26Condition", { "Ref": "ControlPCIPCIEC26", }, "", ], }, "ControlPCIPCIIAM8Ready": { "Fn::If": [ "EnablePCIIAM8Condition", { "Ref": "ControlPCIPCIIAM8", }, "", ], }, }, "Type": "AWS::CloudFormation::WaitConditionHandle", }, }, } `;