""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 """ import json from datetime import date from cfnlint.decode import convert_dict from cfnlint.helpers import FUNCTIONS_SINGLE from cfnlint.rules import CloudFormationLintRule, RuleMatch class Policy(CloudFormationLintRule): """Check if IAM Policy JSON is correct""" id = "E2507" shortdesc = "Check if IAM Policies are properly configured" description = "See if there elements inside an IAM policy are correct" source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-properties-iam-policy.html" tags = ["properties", "iam"] def __init__(self): """Init""" super().__init__() self.resource_exceptions = { "AWS::ECR::Repository": "RepositoryPolicyText", } self.resources_and_keys = { "AWS::ECR::Repository": "RepositoryPolicyText", "AWS::Elasticsearch::Domain": "AccessPolicies", "AWS::OpenSearchService::Domain": "AccessPolicies", "AWS::KMS::Key": "KeyPolicy", "AWS::S3::BucketPolicy": "PolicyDocument", "AWS::SNS::TopicPolicy": "PolicyDocument", "AWS::SQS::QueuePolicy": "PolicyDocument", } self.idp_and_keys = { "AWS::IAM::Group": "Policies", "AWS::IAM::ManagedPolicy": "PolicyDocument", "AWS::IAM::Policy": "PolicyDocument", "AWS::IAM::Role": "Policies", "AWS::IAM::User": "Policies", "AWS::SSO::PermissionSet": "InlinePolicy", } for resource_type in self.resources_and_keys: self.resource_property_types.append(resource_type) for resource_type in self.idp_and_keys: self.resource_property_types.append(resource_type) def check_policy_document( self, value, path, is_identity_policy, resource_exceptions, start_mark, end_mark ): """Check policy document""" matches = [] valid_keys = [ "Version", "Id", "Statement", ] valid_versions = [ "2012-10-17", "2008-10-17", date(2012, 10, 17), date(2008, 10, 17), ] if isinstance(value, str): try: value = convert_dict(json.loads(value), start_mark, end_mark) except Exception as ex: # pylint: disable=W0703,W0612 message = "IAM Policy Documents need to be JSON" matches.append(RuleMatch(path[:], message)) return matches if not isinstance(value, dict): message = "IAM Policy Documents needs to be JSON" matches.append(RuleMatch(path[:], message)) return matches for p_vs, p_p in value.items_safe(path[:], (dict)): for parent_key, parent_value in p_vs.items(): if parent_key not in valid_keys: message = f"IAM Policy key {parent_key} doesn't exist." matches.append(RuleMatch(path[:] + p_p + [parent_key], message)) if parent_key == "Version": if parent_value not in valid_versions: message = f'IAM Policy Version needs to be one of ({", ".join(map(str, ["2012-10-17", "2008-10-17"]))}).' matches.append(RuleMatch(p_p + [parent_key], message)) if parent_key == "Statement": if isinstance(parent_value, list): for i_s_v, i_s_p in parent_value.items_safe( p_p + ["Statement"], (dict) ): matches.extend( self._check_policy_statement( i_s_p, i_s_v, is_identity_policy, resource_exceptions, ) ) elif isinstance(parent_value, dict): for i_s_v, i_s_p in parent_value.items_safe( p_p + ["Statement"] ): matches.extend( self._check_policy_statement( i_s_p, i_s_v, is_identity_policy, resource_exceptions, ) ) else: message = "IAM Policy statement should be of list." matches.append(RuleMatch(p_p + [parent_key], message)) return matches def _check_policy_statement( self, branch, statement, is_identity_policy, resource_exceptions ): """Check statements""" matches = [] statement_valid_keys = [ "Action", "Condition", "Effect", "NotAction", "NotPrincipal", "NotResource", "Principal", "Resource", "Sid", ] for key, _ in statement.items(): if key not in statement_valid_keys: message = f"IAM Policy statement key {key} isn't valid" matches.append(RuleMatch(branch[:] + [key], message)) if "Effect" not in statement: message = "IAM Policy statement missing Effect" matches.append(RuleMatch(branch[:], message)) else: for effect, effect_path in statement.get_safe("Effect"): if isinstance(effect, str): if effect not in ["Allow", "Deny"]: message = "IAM Policy Effect should be Allow or Deny" matches.append(RuleMatch(branch[:] + effect_path, message)) if "Action" not in statement and "NotAction" not in statement: message = "IAM Policy statement missing Action or NotAction" matches.append(RuleMatch(branch[:], message)) if is_identity_policy: if "Principal" in statement or "NotPrincipal" in statement: message = "IAM Resource Policy statement shouldn't have Principal or NotPrincipal" matches.append(RuleMatch(branch[:], message)) else: if "Principal" not in statement and "NotPrincipal" not in statement: message = "IAM Resource Policy statement should have Principal or NotPrincipal" matches.append(RuleMatch(branch[:] + ["Principal"], message)) if not resource_exceptions: if "Resource" not in statement and "NotResource" not in statement: message = "IAM Policy statement missing Resource or NotResource" matches.append(RuleMatch(branch[:], message)) resources = statement.get("Resource", []) if isinstance(resources, str): resources = [resources] for index, resource in enumerate(resources): if isinstance(resource, dict): if len(resource) == 1: for k in resource.keys(): if k not in FUNCTIONS_SINGLE: message = ( "IAM Policy statement Resource incorrectly formatted" ) matches.append( RuleMatch(branch[:] + ["Resource", index], message) ) else: message = "IAM Policy statement Resource incorrectly formatted" matches.append(RuleMatch(branch[:] + ["Resource", index], message)) return matches def match_resource_properties(self, properties, resourcetype, path, cfn): """Check CloudFormation Properties""" matches = [] is_identity_policy = True if resourcetype in self.resources_and_keys: is_identity_policy = False key = None if resourcetype in self.resources_and_keys: key = self.resources_and_keys.get(resourcetype) else: key = self.idp_and_keys.get(resourcetype) if not key: # Key isn't defined return nothing return matches resource_exceptions = False if key == self.resource_exceptions.get(resourcetype): resource_exceptions = True other_keys = [] for key, value in self.resources_and_keys.items(): if value != "Policies": other_keys.append(key) for key, value in self.idp_and_keys.items(): if value != "Policies": other_keys.append(key) for key, value in properties.items(): if key == "Policies" and isinstance(value, list): for index, policy in enumerate(properties.get(key, [])): matches.extend( cfn.check_value( obj=policy, key="PolicyDocument", path=path[:] + ["Policies", index], check_value=self.check_policy_document, is_identity_policy=is_identity_policy, resource_exceptions=resource_exceptions, start_mark=key.start_mark, end_mark=key.end_mark, ) ) elif key in [ "KeyPolicy", "PolicyDocument", "RepositoryPolicyText", "AccessPolicies", "InlinePolicy", ]: matches.extend( cfn.check_value( obj=properties, key=key, path=path[:], check_value=self.check_policy_document, is_identity_policy=is_identity_policy, resource_exceptions=resource_exceptions, start_mark=key.start_mark, end_mark=key.end_mark, ) ) return matches