""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 """ import argparse import copy import glob import json import logging import os import sys from pathlib import Path from typing import Dict import jsonschema import cfnlint.decode.cfn_yaml from cfnlint.helpers import REGIONS from cfnlint.version import __version__ # pylint: disable=too-many-public-methods LOGGER = logging.getLogger("cfnlint") def configure_logging(debug_logging, info_logging): ch = logging.StreamHandler() ch.setLevel(logging.DEBUG) if debug_logging: LOGGER.setLevel(logging.DEBUG) elif info_logging: LOGGER.setLevel(logging.INFO) else: LOGGER.setLevel(logging.NOTSET) log_formatter = logging.Formatter( "%(asctime)s - %(name)s - %(levelname)s - %(message)s" ) ch.setFormatter(log_formatter) # make sure all other log handlers are removed before adding it back for handler in LOGGER.handlers: LOGGER.removeHandler(handler) LOGGER.addHandler(ch) class ConfigFileArgs: """ Config File arguments. Parses .cfnlintrc OR .cfnlintrc.yaml OR .cfnlintrc.yml in the Home and Project folder. """ file_args: Dict = {} __user_config_file = None __project_config_file = None __custom_config_file = None def __init__(self, schema=None, config_file=None): # self.file_args = self.get_config_file_defaults() self.file_args = {} self.default_schema_file = Path(__file__).parent.joinpath( "data/CfnLintCli/config/schema.json" ) with self.default_schema_file.open(encoding="utf-8") as f: self.default_schema = json.load(f) self.schema = self.default_schema if not schema else schema if config_file: self.__custom_config_file = config_file else: LOGGER.debug("Looking for CFLINTRC before attempting to load") self.__user_config_file, self.__project_config_file = self._find_config() self.load() def _find_config(self): """Looks up for user and project level config Returns ------- Tuple (Path, Path) Tuple with both configs and whether they were found Example ------- > user_config, project_config = self._find_config() """ config_file_name = ".cfnlintrc" self.__user_config_file = Path.home().joinpath(config_file_name) self.__project_config_file = Path.cwd().joinpath(config_file_name) if self._has_file(config_file_name + ".yaml"): self.__project_config_file = Path.cwd().joinpath(config_file_name + ".yaml") elif self._has_file(config_file_name + ".yml"): self.__project_config_file = Path.cwd().joinpath(config_file_name + ".yml") user_config_path = "" project_config_path = "" if self._has_file(self.__user_config_file): LOGGER.debug("Found User CFNLINTRC") user_config_path = self.__user_config_file if self._has_file(self.__project_config_file): LOGGER.debug("Found Project level CFNLINTRC") project_config_path = self.__project_config_file return user_config_path, project_config_path def _has_file(self, filename): """Confirm whether file exists Parameters ---------- filename : str Path to a file Returns ------- Boolean """ return Path(filename).is_file() def load(self): """Load configuration file and expose as a dictionary Returns ------- Dict CFLINTRC configuration """ if self.__custom_config_file: custom_config = self._read_config(self.__custom_config_file) LOGGER.debug("Validating Custom CFNLINTRC") self.validate_config(custom_config, self.schema) LOGGER.debug("Custom configuration loaded as") LOGGER.debug("%s", custom_config) self.file_args = custom_config else: user_config = self._read_config(self.__user_config_file) LOGGER.debug("Validating User CFNLINTRC") self.validate_config(user_config, self.schema) project_config = self._read_config(self.__project_config_file) LOGGER.debug("Validating Project CFNLINTRC") self.validate_config(project_config, self.schema) LOGGER.debug("User configuration loaded as") LOGGER.debug("%s", user_config) LOGGER.debug("Project configuration loaded as") LOGGER.debug("%s", project_config) LOGGER.debug("Merging configurations...") self.file_args = self.merge_config(user_config, project_config) def validate_config(self, config, schema): """Validate configuration against schema Parameters ---------- config : dict CFNLINTRC configuration schema : dict JSONSchema to validate against Raises ------- jsonschema.exceptions.ValidationError Returned when cfnlintrc doesn't match schema provided """ LOGGER.debug("Validating CFNLINTRC config with given JSONSchema") LOGGER.debug("Schema used: %s", schema) LOGGER.debug("Config used: %s", config) jsonschema.validate(config, schema) LOGGER.debug("CFNLINTRC looks valid!") def merge_config(self, user_config, project_config): """Merge project and user configuration into a single dictionary Creates a new configuration with both configuration merged it favours project level over user configuration if keys are duplicated NOTE ---- It takes any number of nested dicts It overrides lists found in user_config with project_config Parameters ---------- user_config : Dict User configuration (~/.cfnlintrc) found at user's home directory project_config : Dict Project configuration (.cfnlintrc) found at current directory Returns ------- Dict Merged configuration """ # Recursively override User config with Project config for key in user_config: if key in project_config: # If both keys are the same, let's check whether they have nested keys if isinstance(user_config[key], dict) and isinstance( project_config[key], dict ): self.merge_config(user_config[key], project_config[key]) else: user_config[key] = project_config[key] LOGGER.debug( "Overriding User's key %s with Project's specific value %s.", key, project_config[key], ) # Project may have unique config we need to copy over too # so that we can have user+project config available as one for key in project_config: if key not in user_config: user_config[key] = project_config[key] return user_config def _read_config(self, config): """Parse given YAML configuration Returns ------- Dict Parsed YAML configuration as dictionary """ config = Path(config) config_template = None if self._has_file(config): LOGGER.debug("Parsing CFNLINTRC") config_template = cfnlint.decode.cfn_yaml.load(str(config)) if not config_template: config_template = {} return config_template def comma_separated_arg(string): """Split a comma separated string""" return string.split(",") def _ensure_value(namespace, name, value): if getattr(namespace, name, None) is None: setattr(namespace, name, value) return getattr(namespace, name) class RuleConfigurationAction(argparse.Action): """Override the default Action""" def __init__( self, option_strings, dest, nargs=None, const=None, default=None, type=None, choices=None, required=False, help=None, metavar=None, ): # pylint: disable=W0622 super().__init__( option_strings=option_strings, dest=dest, nargs=nargs, const=const, default=default, type=type, choices=choices, required=required, help=help, metavar=metavar, ) def _parse_rule_configuration(self, string): """Parse the config rule structure""" configs = comma_separated_arg(string) results = {} for config in configs: rule_id = config.split(":")[0] config_name = config.split(":")[1].split("=")[0] config_value = config.split(":")[1].split("=")[1] if rule_id not in results: results[rule_id] = {} results[rule_id][config_name] = config_value return results def __call__(self, parser, namespace, values, option_string=None): items = copy.copy(_ensure_value(namespace, self.dest, {})) try: for value in values: new_value = self._parse_rule_configuration(value) for v_k, v_vs in new_value.items(): if v_k in items: for s_k, s_v in v_vs.items(): items[v_k][s_k] = s_v else: items[v_k] = v_vs setattr(namespace, self.dest, items) except Exception: # pylint: disable=W0703 parser.print_help() parser.exit() class CliArgs: """Base Args class""" cli_args: Dict = {} def __init__(self, cli_args): self.parser = self.create_parser() self.cli_args, _ = self.parser.parse_known_args(cli_args) def create_parser(self): """Do first round of parsing parameters to set options""" class ArgumentParser(argparse.ArgumentParser): """Override Argument Parser so we can control the exit code""" def error(self, message): self.print_help(sys.stderr) self.exit(32, f"{self.prog}: error: {message}\n") class ExtendAction(argparse.Action): """Support argument types that are lists and can be specified multiple times.""" def __call__(self, parser, namespace, values, option_string=None): items = getattr(namespace, self.dest) items = [] if items is None else items for value in values: if isinstance(value, list): items.extend(value) else: items.append(value) setattr(namespace, self.dest, items) usage = ( "\nBasic: cfn-lint test.yaml\n" "Ignore a rule: cfn-lint -i E3012 -- test.yaml\n" "Configure a rule: cfn-lint -x E3012:strict=true -t test.yaml\n" "Lint all yaml files in a folder: cfn-lint dir/**/*.yaml" ) parser = ArgumentParser(description="CloudFormation Linter", usage=usage) parser.register("action", "extend", ExtendAction) standard = parser.add_argument_group("Standard") advanced = parser.add_argument_group("Advanced / Debugging") # Allow the template to be passes as an optional or a positional argument standard.add_argument( "templates", metavar="TEMPLATE", nargs="*", help="The CloudFormation template to be linted", ) standard.add_argument( "-t", "--template", metavar="TEMPLATE", dest="template_alt", help="The CloudFormation template to be linted", nargs="+", default=[], action="extend", ) standard.add_argument( "-b", "--ignore-bad-template", help="Ignore failures with Bad template", action="store_true", ) standard.add_argument( "--ignore-templates", dest="ignore_templates", help="Ignore templates", nargs="+", default=[], action="extend", ) advanced.add_argument( "-D", "--debug", help="Enable debug logging", action="store_true" ) advanced.add_argument( "-I", "--info", help="Enable information logging", action="store_true" ) standard.add_argument( "-f", "--format", help="Output Format", choices=["quiet", "parseable", "json", "junit", "pretty", "sarif"], ) standard.add_argument( "-l", "--list-rules", dest="listrules", default=False, action="store_true", help="list all the rules", ) standard.add_argument( "-r", "--regions", dest="regions", nargs="+", default=[], type=comma_separated_arg, action="extend", help="list the regions to validate against.", ) advanced.add_argument( "-a", "--append-rules", dest="append_rules", nargs="+", default=[], type=comma_separated_arg, action="extend", help="specify one or more rules directories using " "one or more --append-rules arguments. ", ) standard.add_argument( "-i", "--ignore-checks", dest="ignore_checks", nargs="+", default=[], type=comma_separated_arg, action="extend", help="only check rules whose id do not match these values", ) standard.add_argument( "-c", "--include-checks", dest="include_checks", nargs="+", default=[], type=comma_separated_arg, action="extend", help="include rules whose id match these values", ) standard.add_argument( "-m", "--mandatory-checks", dest="mandatory_checks", nargs="+", default=[], type=comma_separated_arg, action="extend", help="always check rules whose id match these values, regardless of template exclusions", ) standard.add_argument( "-e", "--include-experimental", help="Include experimental rules", action="store_true", ) standard.add_argument( "-x", "--configure-rule", dest="configure_rules", nargs="+", default={}, action=RuleConfigurationAction, help="Provide configuration for a rule. Format RuleId:key=value. Example: E3012:strict=true", ) standard.add_argument( "--config-file", dest="config_file", help="Specify the cfnlintrc file to use", ) standard.add_argument( "-z", "--custom-rules", dest="custom_rules", help="Allows specification of a custom rule file.", ) advanced.add_argument( "-o", "--override-spec", dest="override_spec", help="A CloudFormation Spec override file that allows customization", ) advanced.add_argument( "-g", "--build-graph", help="Creates a file in the same directory as the template that models the template's resources in DOT format", action="store_true", ) advanced.add_argument( "-s", "--registry-schemas", help="one or more directories of CloudFormation Registry Schemas", action="extend", type=comma_separated_arg, nargs="+", ) standard.add_argument( "-v", "--version", help="Version of cfn-lint", action="version", version=f"%(prog)s {__version__}", ) advanced.add_argument( "-u", "--update-specs", help="Update the CloudFormation Specs", action="store_true", ) advanced.add_argument( "--update-documentation", help=argparse.SUPPRESS, action="store_true" ) advanced.add_argument( "--update-iam-policies", help=argparse.SUPPRESS, action="store_true" ) standard.add_argument( "--output-file", type=str, default=None, help="Writes the output to the specified file, ideal for producing reports", ) standard.add_argument( "--merge-configs", default=False, action="store_true", help="Merges lists between configuration layers", ) standard.add_argument( "--non-zero-exit-code", type=str, default="informational", choices=["informational", "warning", "error", "none"], help="Exit code will be non zero from the specified rule class and higher", ) advanced.add_argument("--force", help=argparse.SUPPRESS, action="store_true") return parser class TemplateArgs: """Per Template Args""" def __init__(self, template_args): self.set_template_args(template_args) def get_template_args(self): return self._template_args def set_template_args(self, template): defaults = {} if isinstance(template, dict): metadata = template.get("Metadata", {}) if metadata: configs = ( template.get("Metadata", {}).get("cfn-lint", {}).get("config", {}) ) if isinstance(configs, dict): for config_name, config_value in configs.items(): if config_name == "ignore_checks": if isinstance(config_value, list): defaults["ignore_checks"] = config_value if config_name == "regions": if isinstance(config_value, list): defaults["regions"] = config_value if config_name == "append_rules": if isinstance(config_value, list): defaults["append_rules"] = config_value if config_name == "override_spec": if isinstance(config_value, (str)): defaults["override_spec"] = config_value if config_name == "custom_rules": if isinstance(config_value, (str)): defaults["custom_rules"] = config_value if config_name == "ignore_bad_template": if isinstance(config_value, bool): defaults["ignore_bad_template"] = config_value if config_name == "include_checks": if isinstance(config_value, list): defaults["include_checks"] = config_value if config_name == "configure_rules": if isinstance(config_value, dict): defaults["configure_rules"] = config_value if config_name == "include_experimental": if isinstance(config_value, bool): defaults["include_experimental"] = config_value self._template_args = defaults template_args = property(get_template_args, set_template_args) # pylint: disable=too-many-public-methods class ConfigMixIn(TemplateArgs, CliArgs, ConfigFileArgs): """Mixin for the Configs""" def __init__(self, cli_args): CliArgs.__init__(self, cli_args) # configure debug as soon as we can configure_logging(self.cli_args.debug, self.cli_args.info) TemplateArgs.__init__(self, {}) ConfigFileArgs.__init__( self, config_file=self._get_argument_value("config_file", False, False) ) def _get_argument_value(self, arg_name, is_template, is_config_file): cli_value = getattr(self.cli_args, arg_name) template_value = self.template_args.get(arg_name) file_value = self.file_args.get(arg_name) # merge list configurations # make sure we don't do an infinite loop so skip this check for merge_configs if arg_name != "merge_configs": if self.merge_configs: # the CLI will always have an empty list when the item is a list # we will use that to evaluate if we need to merge the lists if isinstance(cli_value, list): # Use a copy here, otherwise we will accumulate template level config # into the cli_value which will persist between template files result = cli_value.copy() if isinstance(template_value, list): result.extend(template_value) if isinstance(file_value, list): result.extend(file_value) return result # return individual items if cli_value: return cli_value if template_value and is_template: return template_value if file_value and is_config_file: return file_value return cli_value @property def ignore_checks(self): return self._get_argument_value("ignore_checks", True, True) @property def include_checks(self): results = self._get_argument_value("include_checks", True, True) return ["W", "E"] + results @property def mandatory_checks(self): return self._get_argument_value("mandatory_checks", False, True) @property def include_experimental(self): return self._get_argument_value("include_experimental", True, True) @property def regions(self): results = self._get_argument_value("regions", True, True) if not results: default_region_env = os.environ.get("AWS_DEFAULT_REGION", "us-east-1") return [os.environ.get("AWS_REGION", default_region_env)] if "ALL_REGIONS" in results: return REGIONS return results @property def ignore_bad_template(self): return self._get_argument_value("ignore_bad_template", True, True) @property def debug(self): return self._get_argument_value("debug", False, False) @property def format(self): return self._get_argument_value("format", False, True) @property def templates(self): templates_args = self._get_argument_value("templates", False, True) template_alt_args = self._get_argument_value("template_alt", False, False) if template_alt_args: filenames = template_alt_args elif templates_args: filenames = templates_args else: return None # if only one is specified convert it to array if isinstance(filenames, str): filenames = [filenames] # handle different shells and Config files # some shells don't expand * and configparser won't expand wildcards all_filenames = [] ignore_templates = self._ignore_templates() for filename in filenames: add_filenames = glob.glob(filename, recursive=True) # only way to know of the glob failed is to test it # then add the filename as requested if not add_filenames: if filename not in ignore_templates: all_filenames.append(filename) else: for add_filename in add_filenames: if add_filename not in ignore_templates: all_filenames.append(add_filename) return sorted(all_filenames) def _ignore_templates(self): ignore_template_args = self._get_argument_value("ignore_templates", False, True) if ignore_template_args: filenames = ignore_template_args else: return [] # if only one is specified convert it to array if isinstance(filenames, str): filenames = [filenames] # handle different shells and Config files # some shells don't expand * and configparser won't expand wildcards all_filenames = [] for filename in filenames: add_filenames = glob.glob(filename, recursive=True) # only way to know of the glob failed is to test it # then add the filename as requested if not add_filenames: all_filenames.append(filename) else: all_filenames.extend(add_filenames) return all_filenames @property def append_rules(self): return self._get_argument_value("append_rules", False, True) @property def override_spec(self): return self._get_argument_value("override_spec", False, True) @property def custom_rules(self): """custom_rules_spec""" return self._get_argument_value("custom_rules", False, True) @property def update_specs(self): return self._get_argument_value("update_specs", False, False) @property def update_documentation(self): return self._get_argument_value("update_documentation", False, False) @property def update_iam_policies(self): return self._get_argument_value("update_iam_policies", False, False) @property def listrules(self): return self._get_argument_value("listrules", False, False) @property def configure_rules(self): return self._get_argument_value("configure_rules", True, True) @property def config_file(self): return self._get_argument_value("config_file", False, False) @property def build_graph(self): return self._get_argument_value("build_graph", False, False) @property def output_file(self): return self._get_argument_value("output_file", False, True) @property def registry_schemas(self): return self._get_argument_value("registry_schemas", False, True) @property def merge_configs(self): return self._get_argument_value("merge_configs", True, True) @property def non_zero_exit_code(self): return self._get_argument_value("non_zero_exit_code", False, False) @property def force(self): return self._get_argument_value("force", False, False)