""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 """ import fnmatch import json import logging import multiprocessing import os import subprocess import warnings import zipfile from copy import deepcopy from io import BytesIO from urllib.request import Request, urlopen import jsonpatch import regex as re import cfnlint import cfnlint.data.AdditionalSpecs from cfnlint.helpers import SPEC_REGIONS, get_url_content, url_has_newer_version LOGGER = logging.getLogger(__name__) REGISTRY_SCHEMA_ZIP = ( "https://schema.cloudformation.us-east-1.amazonaws.com/CloudformationSchema.zip" ) def update_resource_specs(force: bool = False): # Pool() uses cpu count if no number of processors is specified # Pool() only implements the Context Manager protocol from Python3.3 onwards, # so it will fail Python2.7 style linting, as well as throw AttributeError schema_cache = get_schema_value_types() # pre-work us-east-1 for comparison later update_resource_spec("us-east-1", SPEC_REGIONS["us-east-1"], schema_cache, force) try: # pylint: disable=not-context-manager with multiprocessing.Pool() as pool: # Patch from registry schema pool_tuple = [ (k, v, schema_cache, force) for k, v in SPEC_REGIONS.items() if k != "us-east-1" ] pool.starmap(update_resource_spec, pool_tuple) except AttributeError: # Do it the long, slow way for region, url in SPEC_REGIONS.items(): if region != "us-east-1": update_resource_spec(region, url, schema_cache, force) def update_resource_spec(region, url, schema_cache, force: bool = False): """Update a single resource spec""" filename = os.path.join( os.path.dirname(cfnlint.__file__), f"data/CloudSpecs/{region}.json" ) schema_cache = deepcopy(schema_cache) multiprocessing_logger = multiprocessing.log_to_stderr() multiprocessing_logger.debug("Downloading template %s into %s", url, filename) # Check to see if we already have the latest version, and if so stop if not (url_has_newer_version(url) or force): return spec_content = get_url_content(url, caching=True) multiprocessing_logger.debug( "A more recent version of %s was found, and will be downloaded to %s", url, filename, ) spec = json.loads(spec_content) # Patch the files spec = patch_spec(spec, "all") spec = patch_spec(spec, region) # do each patch individually so we can ignore errors for patch in schema_cache: try: # since there could be patched in values to ValueTypes # Ref/GetAtt as an example. So we want to add new # ValueTypes that don't exist for i_patch in patch: path_details = i_patch.get("path").split("/") if path_details[1] == "ValueTypes": if not spec.get("ValueTypes").get(path_details[2]): spec["ValueTypes"][path_details[2]] = {} # do the patch jsonpatch.JsonPatch(patch).apply(spec, in_place=True) except jsonpatch.JsonPatchConflict: for i_patch in patch: path_details = i_patch.get("path").split("/") if path_details[1] == "ValueTypes": if not spec.get("ValueTypes").get(path_details[2]): try: del spec["ValueTypes"][path_details[2]] except: # pylint: disable=bare-except pass LOGGER.debug("Patch (%s) not applied in region %s", patch, region) except jsonpatch.JsonPointerException: for i_patch in patch: path_details = i_patch.get("path").split("/") if path_details[1] == "ValueTypes": if not spec.get("ValueTypes").get(path_details[2]): try: del spec["ValueTypes"][path_details[2]] except: # pylint: disable=bare-except pass if path_details[1] == "ResourceTypes": if ( spec.get("ResourceTypes") .get(path_details[2], {}) .get("Attributes", {}) .get(path_details[4]) ): continue LOGGER.debug( "Parent element not found for patch (%s) in region %s", patch, region, ) else: # Debug as the parent element isn't supported in the region LOGGER.debug( "Parent element not found for patch (%s) in region %s", patch, region, ) # Patch provider schema data spec = patch_spec(spec, "all", "ProviderSchemasPatches") botocore_cache = {} def search_and_replace_botocore_types(obj): if isinstance(obj, dict): new_obj = {} for key, value in obj.items(): if key == "botocore": service_and_type = value.split("/") service = "/".join(service_and_type[:-1]) botocore_type = service_and_type[-1] if service not in botocore_cache: botocore_cache[service] = json.loads( get_url_content( "https://raw.githubusercontent.com/boto/botocore/master/botocore/data/" + service + "/service-2.json" ) ) new_obj["AllowedValues"] = sorted( botocore_cache[service]["shapes"][botocore_type]["enum"] ) else: new_obj[key] = search_and_replace_botocore_types(value) return new_obj if isinstance(obj, list): new_list = [] for item in obj: new_list.append(search_and_replace_botocore_types(item)) return new_list return obj spec = search_and_replace_botocore_types(spec) if region != "us-east-1": base_specs = cfnlint.helpers.load_resource( "cfnlint.data.CloudSpecs", "us-east-1.json" ) for section, section_values in spec.items(): if section in ["ResourceTypes", "PropertyTypes", "ValueTypes"]: for key, value in section_values.items(): base_value = base_specs.get(section, {}).get(key, {}) if json.dumps(value, sort_keys=True) == json.dumps( base_value, sort_keys=True ): spec[section][key] = "CACHED" with open(filename, "w", encoding="utf-8") as f: json.dump(spec, f, indent=1, sort_keys=True, separators=(",", ": ")) def update_documentation(rules): # Update the overview of all rules in the linter filename = "docs/rules.md" # Sort rules by the Rule ID sorted_rules = sorted(rules, key=lambda obj: obj.id) data = [] # Read current file up to the Rules part, everything up to that point is # static documentation. with open(filename, "r", encoding="utf-8") as original_file: line = original_file.readline() while line: data.append(line) if line == "## Rules\n": break line = original_file.readline() # Rebuild the file content with open(filename, "w", encoding="utf-8") as new_file: # Rewrite the static documentation for line in data: new_file.write(line) # Add the rules new_file.write( "(_This documentation is generated by running `cfn-lint --update-documentation`, do not alter this manually_)\n\n" ) new_file.write( f"The following **{len(sorted_rules) + 3}** rules are applied by this linter:\n\n" ) new_file.write( "| Rule ID | Title | Description | Config
(Name:Type:Default) | Source | Tags |\n" ) new_file.write( "| -------- | ----- | ----------- | ---------- | ------ | ---- |\n" ) rule_output = ( '| [{0}]({6}) | {1} | {2} | {3} | [Source]({4}) | {5} |\n' ) for rule in [ cfnlint.rules.ParseError(), cfnlint.rules.TransformError(), cfnlint.rules.RuleError(), ] + sorted_rules: rule_source_code_file = ( "../" + subprocess.check_output( [ "git", "grep", "-l", 'id = "' + rule.id + '"', "src/cfnlint/rules/", ] ) .decode("ascii") .strip() ) rule_id = rule.id + "*" if rule.experimental else rule.id tags = ",".join(f"`{tag}`" for tag in rule.tags) config = "
".join( f'{key}:{values.get("type")}:{values.get("default")}' for key, values in rule.config_definition.items() ) new_file.write( rule_output.format( rule_id, rule.shortdesc, rule.description, config, rule.source_url, tags, rule_source_code_file, ) ) new_file.write("\n\\* experimental rules\n") def patch_spec(content, region, patch_types="ExtendedSpecs"): """Patch the spec file""" LOGGER.info('Patching spec file for region "%s"', region) append_dir = os.path.join(os.path.dirname(__file__), "data", patch_types, region) for dirpath, _, filenames in os.walk(append_dir): filenames.sort() for filename in fnmatch.filter(filenames, "*.json"): file_path = os.path.basename(filename) module = dirpath.replace(f"{append_dir}", f"{region}").replace( os.path.sep, "." ) all_patches = jsonpatch.JsonPatch( cfnlint.helpers.load_resource( f"cfnlint.data.{patch_types}.{module}", file_path ) ) # Process the generic patches 1 by 1 so we can "ignore" failed ones for all_patch in all_patches: try: jsonpatch.JsonPatch([all_patch]).apply(content, in_place=True) except jsonpatch.JsonPatchConflict: LOGGER.debug( "Patch (%s) not applied in region %s", all_patch, region ) except jsonpatch.JsonPointerException: # Debug as the parent element isn't supported in the region LOGGER.debug( "Parent element not found for patch (%s) in region %s", all_patch, region, ) return content def update_iam_policies(): """update iam policies file""" url = "https://awspolicygen.s3.amazonaws.com/js/policies.js" filename = os.path.join( os.path.dirname(cfnlint.data.AdditionalSpecs.__file__), "Policies.json" ) LOGGER.debug("Downloading policies %s into %s", url, filename) content = get_url_content(url) content = content.split("app.PolicyEditorConfig=")[1] content = json.loads(content) actions = { "Manage Amazon API Gateway": ["HEAD", "OPTIONS"], "Amazon API Gateway Management": ["HEAD", "OPTIONS"], "Amazon API Gateway Management V2": ["HEAD", "OPTIONS"], "Amazon Kinesis Video Streams": ["StartStreamEncryption"], } for k, v in actions.items(): if content.get("serviceMap").get(k): content["serviceMap"][k]["Actions"].extend(v) else: LOGGER.debug('"%s" was not found in the policies file', k) with open(filename, "w", encoding="utf-8") as f: json.dump(content, f, indent=1, sort_keys=True, separators=(",", ": ")) def get_schema_value_types(): def resolve_refs(properties, schema): results = {} name = None if properties.get("$ref"): name = properties.get("$ref").split("/")[-1] subname, results = resolve_refs(schema.get("definitions").get(name), schema) if subname: name = subname properties = schema.get("definitions").get(name) if properties.get("type") == "array": results = properties.get("items") if results: properties = results if results and results.get("$ref"): name, results = resolve_refs(results, schema) if not results: return name, properties return name, results def get_object_details(names, properties, schema): results = {} warnings.filterwarnings("error") for propname, propdetails in properties.items(): if propname == "Tag": continue subname, propdetails = resolve_refs(propdetails, schema) t = propdetails.get("type") if not t: continue if t in ["object"]: if subname is None: subname = propname if propdetails.get("properties"): if subname not in names: results.update( get_object_details( names + [subname], propdetails.get("properties"), schema ) ) elif ( propdetails.get("oneOf") or propdetails.get("anyOf") or propdetails.get("allOf") ): LOGGER.info( "Type %s object for %s has only oneOf,anyOf, or allOf properties", names[0], propname, ) continue elif t not in ["string", "integer", "number", "boolean"]: if propdetails.get("$ref"): results.update( get_object_details( names + [propname], schema.get("definitions").get(t.get("$ref").split("/")[-1]), schema, ) ) elif isinstance(t, list): LOGGER.info( "Type for %s object and %s property is a list", names[0], propname, ) else: LOGGER.info( "Unable to handle %s object for %s property", names[0], propname ) elif t == "string": if not results.get(".".join(names + [propname])): if ( propdetails.get("pattern") or propdetails.get("maxLength") or propdetails.get("enum") ): results[".".join(names + [propname])] = {} if propdetails.get("pattern"): p = propdetails.get("pattern") # python 3 has the ability to test isascii # python 3.7 introduces is ascii so switching to encode try: p.encode("ascii") except UnicodeEncodeError: continue try: re.compile(p, re.UNICODE) results[".".join(names + [propname])].update( {"AllowedPatternRegex": p} ) except: # pylint: disable=bare-except LOGGER.info( "Unable to handle regex for type %s and property %s with regex %s", names[0], propname, p, ) if propdetails.get("maxLength"): results[".".join(names + [propname])].update( { "StringMin": propdetails.get("minLength", 0), "StringMax": propdetails.get("maxLength"), } ) if propdetails.get("enum"): results[".".join(names + [propname])].update( {"AllowedValues": propdetails.get("enum")} ) elif t in ["number", "integer"]: if not results.get(".".join(names + [propname])): if propdetails.get("minimum") and propdetails.get("maximum"): results[".".join(names + [propname])] = {} if propdetails.get("minimum") and propdetails.get("maximum"): results[".".join(names + [propname])].update( { "NumberMin": propdetails.get("minimum"), "NumberMax": propdetails.get("maximum"), } ) return results def process_schema(schema): details = get_object_details( [schema.get("typeName")], schema.get("properties"), schema ) # Remove duplicates vtypes = {} for n, v in details.items(): if n.count(".") > 1: s = n.split(".") vtypes[s[0] + "." + ".".join(s[-2:])] = v else: vtypes[n] = v patches = [] for n, v in vtypes.items(): patch = [] if v: if n.count(".") == 2: if ".Tag." in n: continue r_type = "PropertyTypes" else: r_type = "ResourceTypes" element = { "op": "add", "path": f'/{r_type}/{".".join(n.split(".")[0:-1])}/Properties/{n.split(".")[-1]}/Value', "value": { "ValueType": n, }, } patch.append(element) for s, vs in v.items(): element = { "op": "add", "path": f"/ValueTypes/{n}/{s}", "value": vs, } patch.append(element) if patch: patches.append(patch) return patches req = Request(REGISTRY_SCHEMA_ZIP) results = [] with urlopen(req) as res: with zipfile.ZipFile(BytesIO(res.read())) as z: for f in z.namelist(): with z.open(f) as d: if not isinstance(d, str): data = d.read() else: data = d if isinstance(data, bytes): data = data.decode("utf-8") schema = json.loads(data) patches = process_schema(schema) results.extend(patches) return results