""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 """ import datetime import json import regex as re import cfnlint.helpers from cfnlint.helpers import RESOURCE_SPECS from cfnlint.rules import CloudFormationLintRule, RuleMatch class JsonSize(CloudFormationLintRule): """Check if JSON Object Size is within the specified length""" id = "E3502" shortdesc = "Check if a JSON Object is within size limits" description = "Validate properties that are JSON values so that their length is within the limits" source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/cloudformation-limits.html" tags = ["resources", "limits", "json"] def initialize(self, cfn): """Initialize the rule""" for resource_type_spec in RESOURCE_SPECS.get(cfn.regions[0]).get( "ResourceTypes" ): self.resource_property_types.append(resource_type_spec) for property_type_spec in RESOURCE_SPECS.get(cfn.regions[0]).get( "PropertyTypes" ): self.resource_sub_property_types.append(property_type_spec) def _serialize_date(self, obj): if isinstance(obj, datetime.date): return obj.isoformat() raise TypeError( f"Object of type {obj.__class__.__name__} is not JSON serializable" ) def check_value(self, value, path, prop, cfn, specs): """Check Role.AssumeRolePolicyDocument is within limits""" matches = [] # pylint: disable=too-many-return-statements def remove_functions(obj): """Replaces intrinsic functions with string""" if isinstance(obj, dict): new_obj = {} if len(obj) == 1: for k, v in obj.items(): if k in cfnlint.helpers.FUNCTIONS: if k == "Fn::Sub": if isinstance(v, str): return re.sub(r"\${.*}", "", v) if isinstance(v, list): return re.sub(r"\${.*}", "", v[0]) else: return "" else: new_obj[k] = remove_functions(v) return new_obj else: for k, v in obj.items(): new_obj[k] = remove_functions(v) return new_obj elif isinstance(obj, list): new_list = [] for v in obj: new_list.append(remove_functions(v)) return new_list return obj scenarios = cfn.get_object_without_nested_conditions(value, path) json_max_size = specs.get("JsonMax") for scenario in scenarios: j = remove_functions(scenario["Object"][prop]) if isinstance(j, str): try: j = json.loads(j) except: # pylint: disable=bare-except continue if ( len(json.dumps(j, separators=(",", ":"), default=self._serialize_date)) > json_max_size ): if scenario["Scenario"]: message = ( "{0} JSON text cannot be longer than {1} characters when {2}" ) scenario_text = " and ".join( [ f'when condition "{k}" is {v}' for (k, v) in scenario["Scenario"].items() ] ) matches.append( RuleMatch( path + [prop], message.format(prop, json_max_size, scenario_text), ) ) else: message = "{0} JSON text cannot be longer than {1} characters" matches.append( RuleMatch( path + [prop], message.format(prop, json_max_size), ) ) return matches def check(self, cfn, properties, specs, path): """Check itself""" matches = [] for p_value, p_path in properties.items_safe(path[:]): for prop in p_value: if prop in specs: value = specs.get(prop).get("Value", {}) if value: value_type = value.get("ValueType", "") primitive_type = specs.get(prop).get("PrimitiveType") if primitive_type == "Json": matches.extend( self.check_value( p_value, p_path, prop, cfn, RESOURCE_SPECS.get(cfn.regions[0]) .get("ValueTypes") .get(value_type, {}), ) ) return matches def match_resource_sub_properties(self, properties, property_type, path, cfn): """Match for sub properties""" matches = [] specs = ( RESOURCE_SPECS.get(cfn.regions[0]) .get("PropertyTypes") .get(property_type, {}) .get("Properties", {}) ) matches.extend(self.check(cfn, properties, specs, path)) return matches def match_resource_properties(self, properties, resource_type, path, cfn): """Check CloudFormation Properties""" matches = [] specs = ( RESOURCE_SPECS.get(cfn.regions[0]) .get("ResourceTypes") .get(resource_type, {}) .get("Properties", {}) ) matches.extend(self.check(cfn, properties, specs, path)) return matches