""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 """ from cfnlint.helpers import RESOURCE_SPECS, VALID_PARAMETER_TYPES_LIST from cfnlint.rules import CloudFormationLintRule, RuleMatch class Join(CloudFormationLintRule): """Check if Join values are correct""" id = "E1022" shortdesc = "Join validation of parameters" description = "Making sure the join function is properly configured" source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/intrinsic-function-reference-join.html" tags = ["functions", "join"] def __init__(self): """Initialize the rule""" super().__init__() self.list_supported_functions = [] self.singular_supported_functions = [] for intrinsic_type, intrinsic_value in ( RESOURCE_SPECS.get("us-east-1").get("IntrinsicTypes").items() ): if "List" in intrinsic_value.get("ReturnTypes", []): self.list_supported_functions.append(intrinsic_type) if "Singular" in intrinsic_value.get("ReturnTypes", []): self.singular_supported_functions.append(intrinsic_type) def _get_parameters(self, cfn): """Get all Parameter Names""" results = {} parameters = cfn.template.get("Parameters", {}) if isinstance(parameters, dict): for param_name, param_values in parameters.items(): # This rule isn't here to check the Types but we need # something valid if it doesn't exist results[param_name] = param_values.get("Type", "String") return results def _normalize_getatt(self, getatt): """Normalize getatt into an array""" if isinstance(getatt, str): return getatt.split(".", 1) return getatt def _is_ref_a_list(self, parameter, template_parameters): """Is a Ref a list""" list_params = [ "AWS::NotificationARNs", ] if parameter in template_parameters: if template_parameters.get(parameter) in VALID_PARAMETER_TYPES_LIST: return True if parameter in list_params: return True return False def _is_getatt_a_list(self, parameter, get_atts): """Is a GetAtt a List""" for resource, attributes in get_atts.items(): for attribute_name, attribute_values in attributes.items(): if resource == parameter[0] and attribute_name == "*": if attribute_values.get("PrimitiveItemType"): return "FALSE" if attribute_values.get("Type") == "List": return "TRUE" return "UNKNOWN" if resource == parameter[0] and attribute_name == parameter[1]: if attribute_values.get("Type") == "List": return "TRUE" return "FALSE" def _match_string_objs(self, join_string_objs, cfn, path): """Check join list""" matches = [] template_parameters = self._get_parameters(cfn) get_atts = cfn.get_valid_getatts() if isinstance(join_string_objs, dict): if len(join_string_objs) == 1: for key, value in join_string_objs.items(): if key not in self.list_supported_functions: message = "Fn::Join unsupported function for {0}" matches.append( RuleMatch(path, message.format("/".join(map(str, path)))) ) elif key in ["Ref"]: if not self._is_ref_a_list(value, template_parameters): message = "Fn::Join must use a list at {0}" matches.append( RuleMatch( path, message.format("/".join(map(str, path))) ) ) elif key in ["Fn::GetAtt"]: if ( self._is_getatt_a_list( self._normalize_getatt(value), get_atts ) == "FALSE" ): message = "Fn::Join must use a list at {0}" matches.append( RuleMatch( path, message.format("/".join(map(str, path))) ) ) else: message = "Join list of values should be singular for {0}" matches.append( RuleMatch(path, message.format("/".join(map(str, path)))) ) elif not isinstance(join_string_objs, list): message = "Join list of values for {0}" matches.append(RuleMatch(path, message.format("/".join(map(str, path))))) else: for string_obj in join_string_objs: if isinstance(string_obj, dict): if len(string_obj) == 1: for key, value in string_obj.items(): if key not in self.singular_supported_functions: message = "Join unsupported function for {0}" matches.append( RuleMatch( path, message.format("/".join(map(str, path))) ) ) elif key in ["Ref"]: if self._is_ref_a_list(value, template_parameters): message = "Fn::Join must not be a list at {0}" matches.append( RuleMatch( path, message.format("/".join(map(str, path))), ) ) elif key in ["Fn::GetAtt"]: if ( self._is_getatt_a_list( self._normalize_getatt(value), get_atts ) == "TRUE" ): message = "Fn::Join must not be a list at {0}" matches.append( RuleMatch( path, message.format("/".join(map(str, path))), ) ) else: message = "Join list of values should be singular for {0}" matches.append( RuleMatch(path, message.format("/".join(map(str, path)))) ) elif not isinstance(string_obj, str): message = "Join list of singular function or string for {0}" matches.append( RuleMatch(path, message.format("/".join(map(str, path)))) ) return matches def match(self, cfn): matches = [] join_objs = cfn.search_deep_keys("Fn::Join") for join_obj in join_objs: join_value_obj = join_obj[-1] path = join_obj[:-1] if isinstance(join_value_obj, list): if len(join_value_obj) == 2: join_string = join_value_obj[0] join_string_objs = join_value_obj[1] if not isinstance(join_string, str): message = "Join string has to be of type string for {0}" matches.append( RuleMatch(path, message.format("/".join(map(str, path)))) ) matches.extend(self._match_string_objs(join_string_objs, cfn, path)) else: message = "Join should be an array of 2 for {0}" matches.append( RuleMatch(path, message.format("/".join(map(str, path)))) ) else: message = "Join should be an array of 2 for {0}" matches.append( RuleMatch(path, message.format("/".join(map(str, path)))) ) return matches