""" Helpers for loading resources, managing specs, constants, etc. Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 """ import datetime import fnmatch import gzip import hashlib import importlib import importlib.resources as pkg_resources import inspect import json import logging import os import sys from io import BytesIO from typing import Dict, List from urllib.request import Request, urlopen import regex as re from cfnlint.data import CloudSpecs LOGGER = logging.getLogger(__name__) SPEC_REGIONS = { "af-south-1": "https://cfn-resource-specifications-af-south-1-prod.s3.af-south-1.amazonaws.com/latest/gzip/CloudFormationResourceSpecification.json", "ap-east-1": "https://cfn-resource-specifications-ap-east-1-prod.s3.ap-east-1.amazonaws.com/latest/gzip/CloudFormationResourceSpecification.json", "ap-northeast-1": "https://d33vqc0rt9ld30.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "ap-northeast-2": "https://d1ane3fvebulky.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "ap-northeast-3": "https://d2zq80gdmjim8k.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "ap-south-1": "https://d2senuesg1djtx.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "ap-south-2": "https://cfn-resource-specifications-ap-south-2-prod.s3.ap-south-2.amazonaws.com/latest/gzip/CloudFormationResourceSpecification.json", "ap-southeast-1": "https://doigdx0kgq9el.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "ap-southeast-2": "https://d2stg8d246z9di.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "ap-southeast-3": "https://cfn-resource-specifications-ap-southeast-3-prod.s3.ap-southeast-3.amazonaws.com/latest/CloudFormationResourceSpecification.json", "ap-southeast-4": "https://cfn-resource-specifications-ap-southeast-4-prod.s3.ap-southeast-4.amazonaws.com/latest/CloudFormationResourceSpecification.json", "ca-central-1": "https://d2s8ygphhesbe7.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "cn-north-1": "https://cfn-resource-specifications-cn-north-1-prod.s3.cn-north-1.amazonaws.com.cn/latest/gzip/CloudFormationResourceSpecification.json", "cn-northwest-1": "https://cfn-resource-specifications-cn-northwest-1-prod.s3.cn-northwest-1.amazonaws.com.cn/latest/gzip/CloudFormationResourceSpecification.json", "il-central-1": "https://cfn-resource-specifications-il-central-1-prod.s3.il-central-1.amazonaws.com/latest/gzip/CloudFormationResourceSpecification.json", "eu-central-1": "https://d1mta8qj7i28i2.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "eu-central-2": "https://cfn-resource-specifications-eu-central-2-prod.s3.eu-central-2.amazonaws.com/latest/gzip/CloudFormationResourceSpecification.json", "eu-north-1": "https://diy8iv58sj6ba.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "eu-south-1": "https://cfn-resource-specifications-eu-south-1-prod.s3.eu-south-1.amazonaws.com/latest/gzip/CloudFormationResourceSpecification.json", "eu-south-2": "https://cfn-resource-specifications-eu-south-2-prod.s3.eu-south-2.amazonaws.com/latest/gzip/CloudFormationResourceSpecification.json", "eu-west-1": "https://d3teyb21fexa9r.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "eu-west-2": "https://d1742qcu2c1ncx.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "eu-west-3": "https://d2d0mfegowb3wk.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "me-south-1": "https://cfn-resource-specifications-me-south-1-prod.s3.me-south-1.amazonaws.com/latest/gzip/CloudFormationResourceSpecification.json", "me-central-1": "https://cfn-resource-specifications-me-central-1-prod.s3.me-central-1.amazonaws.com/latest/gzip/CloudFormationResourceSpecification.json", "sa-east-1": "https://d3c9jyj3w509b0.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "us-east-1": "https://d1uauaxba7bl26.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "us-east-2": "https://dnwj8swjjbsbt.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "us-gov-east-1": "https://s3.us-gov-east-1.amazonaws.com/cfn-resource-specifications-us-gov-east-1-prod/latest/gzip/CloudFormationResourceSpecification.json", "us-gov-west-1": "https://s3.us-gov-west-1.amazonaws.com/cfn-resource-specifications-us-gov-west-1-prod/latest/gzip/CloudFormationResourceSpecification.json", "us-west-1": "https://d68hl49wbnanq.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", "us-west-2": "https://d201a2mn26r7lk.cloudfront.net/latest/gzip/CloudFormationResourceSpecification.json", } TAG_MAP = "tag:yaml.org,2002:map" UNCONVERTED_SUFFIXES = ["Ref", "Condition"] FN_PREFIX = "Fn::" CONDITION_FUNCTIONS = ["Fn::If"] REGIONS = list(SPEC_REGIONS.keys()) REGEX_ALPHANUMERIC = re.compile("^[a-zA-Z0-9]*$") REGEX_CIDR = re.compile( r"^(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])(\/([0-9]|[1-2][0-9]|3[0-2]))$" ) REGEX_IPV4 = re.compile( r"^(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)(\.(25[0-5]|2[0-4][0-9]|[01]?[0-9][0-9]?)){3}$" ) REGEX_IPV6 = re.compile( r"^(((?=.*(::))(?!.*\3.+\3))\3?|[\dA-F]{1,4}:)([\dA-F]{1,4}(\3|:\b)|\2){5}(([\dA-F]{1,4}(\3|:\b|$)|\2){2}|(((2[0-4]|1\d|[1-9])?\d|25[0-5])\.?\b){4})\Z", re.I | re.S, ) REGEX_DYN_REF = re.compile(r"^.*{{resolve:.+}}.*$") REGEX_DYN_REF_SSM = re.compile(r"^.*{{resolve:ssm:[a-zA-Z0-9_\.\-/]+(:\d+)?}}.*$") REGEX_DYN_REF_SSM_SECURE = re.compile( r"^.*{{resolve:ssm-secure:[a-zA-Z0-9_\.\-/]+(:\d+)?}}.*$" ) FUNCTIONS = [ "Fn::Base64", "Fn::GetAtt", "Fn::GetAZs", "Fn::ImportValue", "Fn::Join", "Fn::Split", "Fn::FindInMap", "Fn::Select", "Ref", "Fn::If", "Fn::Contains", "Fn::Sub", "Fn::Cidr", "Fn::Length", "Fn::ToJsonString", ] FUNCTIONS_MULTIPLE = ["Fn::GetAZs", "Fn::Split"] # FindInMap can be singular or multiple. This needs to be accounted for individually FUNCTIONS_SINGLE = list(set(FUNCTIONS) - set(FUNCTIONS_MULTIPLE) - set("Fn::FindInMap")) FUNCTION_IF = "Fn::If" FUNCTION_AND = "Fn::And" FUNCTION_OR = "Fn::Or" FUNCTION_NOT = "Fn::Not" FUNCTION_EQUALS = "Fn::Equals" FUNCTION_FOR_EACH = re.compile(r"^Fn::ForEach::[a-zA-Z0-9]+$") PSEUDOPARAMS = [ "AWS::AccountId", "AWS::NotificationARNs", "AWS::NoValue", "AWS::Partition", "AWS::Region", "AWS::StackId", "AWS::StackName", "AWS::URLSuffix", ] LIMITS = { "Mappings": {"number": 200, "attributes": 200, "name": 255}, # in characters "Outputs": { "number": 200, "name": 255, # in characters "description": 1024, # in bytes }, "Parameters": { "number": 200, "name": 255, # in characters "value": 4096, # in bytes }, "Resources": {"number": 500, "name": 255}, # in characters "template": {"body": 1000000, "description": 1024}, # in bytes # in bytes "threshold": 0.9, # for rules about approaching the other limit values } valid_snapshot_types = [ "AWS::EC2::Volume", "AWS::ElastiCache::CacheCluster", "AWS::ElastiCache::ReplicationGroup", "AWS::Neptune::DBCluster", "AWS::RDS::DBCluster", "AWS::RDS::DBInstance", "AWS::Redshift::Cluster", ] VALID_PARAMETER_TYPES_SINGLE = [ "AWS::EC2::AvailabilityZone::Name", "AWS::EC2::Image::Id", "AWS::EC2::Instance::Id", "AWS::EC2::KeyPair::KeyName", "AWS::EC2::SecurityGroup::GroupName", "AWS::EC2::SecurityGroup::Id", "AWS::EC2::Subnet::Id", "AWS::EC2::VPC::Id", "AWS::EC2::Volume::Id", "AWS::Route53::HostedZone::Id", "AWS::SSM::Parameter::Name", "Number", "String", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value", ] VALID_PARAMETER_TYPES_LIST = [ "CommaDelimitedList", "List", "List", "List", "List", "List", "List", "List", "List", "List", "List", "List", "AWS::SSM::Parameter::Value", "AWS::SSM::Parameter::Value>", "AWS::SSM::Parameter::Value>", "AWS::SSM::Parameter::Value>", "AWS::SSM::Parameter::Value>", "AWS::SSM::Parameter::Value>", "AWS::SSM::Parameter::Value>", "AWS::SSM::Parameter::Value>", "AWS::SSM::Parameter::Value>", "AWS::SSM::Parameter::Value>", "AWS::SSM::Parameter::Value>", "AWS::SSM::Parameter::Value>", ] VALID_PARAMETER_TYPES = VALID_PARAMETER_TYPES_SINGLE + VALID_PARAMETER_TYPES_LIST # pylint: disable=missing-class-docstring class RegexDict(dict): def __getitem__(self, item): possible_items = {} for k, v in self.items(): if isinstance(v, dict): if v.get("Type") == "MODULE": if re.match(k, item): possible_items[k] = v else: if k == item: possible_items[k] = v elif re.match(k, item): possible_items[k] = v if not possible_items: raise KeyError longest_match = sorted(possible_items.keys(), key=len)[-1] return possible_items[longest_match] def __contains__(self, item): for k, v in self.items(): if isinstance(v, dict): if v.get("Type") == "MODULE": if re.match(k, item): return True else: if k == item: return True elif re.match(k, item): return True return False def get(self, key, default=None): try: return self[key] except KeyError: return default def get_metadata_filename(url): """Returns the filename for a metadata file associated with a remote resource""" caching_dir = os.path.join(os.path.dirname(__file__), "data", "DownloadsMetadata") encoded_url = hashlib.sha256(url.encode()).hexdigest() metadata_filename = os.path.join(caching_dir, encoded_url + ".meta.json") return metadata_filename def url_has_newer_version(url): """Checks to see if a newer version of the resource at the URL is available Always returns true if using Python2.7 due to lack of HEAD request support, or if we have no caching information for the local version of the resource """ metadata_filename = get_metadata_filename(url) # Load in the cache metadata = load_metadata(metadata_filename) # Etag is a caching identifier used by S3 and Cloudfront if "etag" in metadata: cached_etag = metadata["etag"] else: # If we don't know the etag of the local version, we should force an update return True # Need to wrap this in a try, as URLLib2 in Python2 doesn't support HEAD requests try: # Make an initial HEAD request req = Request(url, method="HEAD") with urlopen(req) as res: # If we have an ETag value stored and it matches the returned one, # then we already have a copy of the most recent version of the # resource, so don't bother fetching it again if ( cached_etag and res.info().get("ETag") and cached_etag == res.info().get("ETag") ): LOGGER.debug( "We already have a cached version of url %s with ETag value of %s", url, cached_etag, ) return False except NameError: # We should force an update return True # The ETag value of the remote resource does not match the local one, so a newer version is available return True def get_url_content(url, caching=False): """Get the contents of a spec file""" with urlopen(url) as res: if caching and res.info().get("ETag"): metadata_filename = get_metadata_filename(url) # Load in all existing values metadata = load_metadata(metadata_filename) metadata["etag"] = res.info().get("ETag") metadata["url"] = url # To make it obvious which url the Tag relates to save_metadata(metadata, metadata_filename) # Continue to handle the file download normally if res.info().get("Content-Encoding") == "gzip": buf = BytesIO(res.read()) f = gzip.GzipFile(fileobj=buf) content = f.read().decode("utf-8") else: content = res.read().decode("utf-8") return content def load_metadata(filename): """Get the contents of the download metadata file""" metadata = {} if os.path.exists(filename): with open(filename, "r", encoding="utf-8") as metadata_file: metadata = json.load(metadata_file) return metadata def save_metadata(metadata, filename): """Save the contents of the download metadata file""" dirname = os.path.dirname(filename) if not os.path.exists(dirname): os.mkdir(dirname) with open(filename, "w", encoding="utf-8") as metadata_file: json.dump(metadata, metadata_file) def load_resource(package, filename="us-east-1.json"): """Load CloudSpec resources :param filename: filename to load :return: Json output of the resource laoded """ if sys.version_info >= (3, 9): return json.loads( pkg_resources.files(package) # pylint: disable=no-member .joinpath(filename) .read_text(encoding="utf-8") ) return json.loads(pkg_resources.read_text(package, filename, encoding="utf-8")) RESOURCE_SPECS: Dict[str, dict] = {} REGISTRY_SCHEMAS: List[dict] = [] def merge_spec(source, destination): """Recursive merge spec dict""" for key, value in source.items(): if isinstance(value, dict): node = destination.setdefault(key, {}) merge_spec(value, node) else: destination[key] = value return destination def set_specs(override_spec_data): """Override Resource Specs""" excludes = [] includes = [] # Extract the exclude list from the override file if "ExcludeResourceTypes" in override_spec_data: excludes = override_spec_data.pop("ExcludeResourceTypes") if "IncludeResourceTypes" in override_spec_data: includes = override_spec_data.pop("IncludeResourceTypes") for region, spec in RESOURCE_SPECS.items(): # Merge override spec file into the AWS Resource specification if override_spec_data: RESOURCE_SPECS[region] = merge_spec(override_spec_data, spec) # Grab a list of all resources all_resources = list(RESOURCE_SPECS[region]["ResourceTypes"].keys())[:] resources = [] # Remove unsupported resource using includes if includes: for include in includes: regex = re.compile(include.replace("*", "(.*)") + "$") matches = [ string for string in all_resources if re.match(regex, string) ] resources.extend(matches) else: resources = all_resources[:] # Remove unsupported resources using the excludes if excludes: for exclude in excludes: regex = re.compile(exclude.replace("*", "(.*)") + "$") matches = [string for string in resources if re.match(regex, string)] for match in matches: resources.remove(match) # Remove unsupported resources for resource in all_resources: if resource not in resources: del RESOURCE_SPECS[region]["ResourceTypes"][resource] def is_custom_resource(resource_type): """Return True if resource_type is a custom resource""" return resource_type and ( resource_type == "AWS::CloudFormation::CustomResource" or resource_type.startswith("Custom::") ) def bool_compare(first, second): """Compare strings to boolean values""" if isinstance(first, str): first = bool(first.lower() in ["true", "True"]) if isinstance(second, str): second = bool(second.lower() in ["true", "True"]) return first is second def initialize_specs(): """Reload Resource Specs""" def load_region(region): spec = load_resource(CloudSpecs, filename=f"{region}.json") for section, section_values in spec.items(): if section in ["ResourceTypes", "PropertyTypes", "ValueTypes"]: for key, value in section_values.items(): if value == "CACHED" and RESOURCE_SPECS["us-east-1"][section].get( key ): spec[section][key] = RESOURCE_SPECS["us-east-1"][section][key] return spec RESOURCE_SPECS["us-east-1"] = load_region("us-east-1") for region in REGIONS: if region != "us-east-1": RESOURCE_SPECS[region] = load_region(region) initialize_specs() def format_json_string(json_string): """Format the given JSON string""" def converter(o): # pylint: disable=R1710 """Help convert date/time into strings""" if isinstance(o, datetime.datetime): return o.__str__() # pylint: disable=unnecessary-dunder-call return json.dumps( json_string, indent=1, sort_keys=True, separators=(",", ": "), default=converter ) def create_rules(mod): """Create and return an instance of each CloudFormationLintRule subclass from the given module.""" result = [] for _, clazz in inspect.getmembers(mod, inspect.isclass): if ( clazz.__name__ == "CustomRule" and clazz.__module__ == "cfnlint.rules.custom" ): continue method_resolution = inspect.getmro(clazz) if [ clz for clz in method_resolution[1:] if clz.__module__ in ("cfnlint", "cfnlint.rules") and clz.__name__ == "CloudFormationLintRule" ]: # create and instance of subclasses of CloudFormationLintRule obj = clazz() result.append(obj) return result loader_details = ( importlib.machinery.SourceFileLoader, # pylint: disable=no-member importlib.machinery.SOURCE_SUFFIXES, # pylint: disable=no-member ) def import_filename(pluginname, root): """import_filename imports a module from a file""" mod_finder = importlib.machinery.FileFinder( # pylint: disable=no-member root, loader_details ) mod_spec = mod_finder.find_spec(pluginname) if mod_spec is not None: # for python 2.7 disabling pylint checks mod = importlib.util.module_from_spec(mod_spec) # pylint: disable=no-member mod_spec.loader.exec_module(mod) return mod return None def load_plugins(directory): """Load plugins""" result = [] def onerror(os_error): """Raise an error""" raise os_error for root, _, filenames in os.walk(directory, onerror=onerror): for filename in fnmatch.filter(filenames, "[A-Za-z]*.py"): mod = import_filename(filename.replace(".py", ""), root) if mod is not None: result.extend(create_rules(mod)) return result def override_specs(override_spec_file): """Override specs file""" try: filename = override_spec_file with open(filename, encoding="utf-8") as fp: custom_spec_data = json.load(fp) set_specs(custom_spec_data) except IOError as e: if e.errno == 2: LOGGER.error("Override spec file not found: %s", filename) sys.exit(1) elif e.errno == 21: LOGGER.error( "Override spec file references a directory, not a file: %s", filename ) sys.exit(1) elif e.errno == 13: LOGGER.error( "Permission denied when accessing override spec file: %s", filename ) sys.exit(1) except ValueError as err: LOGGER.error("Override spec file %s is malformed: %s", filename, err) sys.exit(1)