""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 """ import json from cfnlint.rules import CloudFormationLintRule, RuleMatch class StateMachine(CloudFormationLintRule): """Check State Machine Definition""" id = "E2532" shortdesc = "Check State Machine Definition for proper syntax" description = ( "Check the State Machine String Definition to make sure its JSON. " "Validate basic syntax of the file to determine validity." ) source_url = "https://docs.aws.amazon.com/AWSCloudFormation/latest/UserGuide/aws-resource-stepfunctions-statemachine.html" tags = ["resources", "stepfunctions"] def __init__(self): """Init""" super().__init__() self.resource_property_types.append("AWS::StepFunctions::StateMachine") def _check_state_json(self, def_json, state_name, path): """Check State JSON Definition""" matches = [] # https://docs.aws.amazon.com/step-functions/latest/dg/amazon-states-language-common-fields.html common_state_keys = [ "Next", "End", "Type", "Comment", "InputPath", "OutputPath", ] common_state_required_keys = [ "Type", ] state_key_types = { "Pass": ["Result", "ResultPath", "Parameters"], "Task": [ "Credentials", "Resource", "Parameters", "ResultPath", "ResultSelector", "Retry", "Catch", "TimeoutSeconds", "TimeoutSecondsPath", "Parameters", "HeartbeatSeconds", "HeartbeatSecondsPath", ], "Map": [ "MaxConcurrency", "Iterator", "ItemsPath", "ItemProcessor", "ItemSelector", "ResultPath", "ResultSelector", "Retry", "Catch", "Parameters", ], "Choice": ["Choices", "Default"], "Wait": ["Seconds", "Timestamp", "SecondsPath", "TimestampPath"], "Succeed": [], "Fail": ["Cause", "Error"], "Parallel": [ "Branches", "ResultPath", "ResultSelector", "Parameters", "Retry", "Catch", ], } state_required_types = { "Pass": [], "Task": ["Resource"], "Choice": ["Choices"], "Wait": [], "Succeed": [], "Fail": [], "Parallel": ["Branches"], } for req_key in common_state_required_keys: if req_key not in def_json: message = f"State Machine Definition required key ({req_key}) for State ({state_name}) is missing" matches.append(RuleMatch(path, message)) return matches state_type = def_json.get("Type") if state_type in state_key_types: for state_key, _ in def_json.items(): if state_key not in common_state_keys + state_key_types.get( state_type, [] ): message = f"State Machine Definition key ({state_key}) for State ({state_name}) of Type ({state_type}) is not valid" matches.append(RuleMatch(path, message)) for req_key in common_state_required_keys + state_required_types.get( state_type, [] ): if req_key not in def_json: message = f"State Machine Definition required key ({req_key}) for State ({state_name}) of Type ({state_type}) is missing" matches.append(RuleMatch(path, message)) return matches else: message = f"State Machine Definition Type ({state_type}) is not valid" matches.append(RuleMatch(path, message)) return matches def _check_definition_json(self, def_json, path): """Check JSON Definition""" matches = [] top_level_keys = ["Comment", "StartAt", "TimeoutSeconds", "Version", "States"] top_level_required_keys = ["StartAt", "States"] for top_key, _ in def_json.items(): if top_key not in top_level_keys: message = f"State Machine Definition key ({top_key}) is not valid" matches.append(RuleMatch(path, message)) for req_key in top_level_required_keys: if req_key not in def_json: message = ( f"State Machine Definition required key ({req_key}) is missing" ) matches.append(RuleMatch(path, message)) for state_name, state_value in def_json.get("States", {}).items(): matches.extend(self._check_state_json(state_value, state_name, path)) return matches def check_value(self, value, path, fail_on_loads=True): """Check Definition Value""" matches = [] try: def_json = json.loads(value) # pylint: disable=W0703 except Exception as err: if fail_on_loads: message = f"State Machine Definition needs to be formatted as JSON. Error {err}" matches.append(RuleMatch(path, message)) return matches self.logger.debug("State Machine definition could not be parsed. Skipping") return matches matches.extend(self._check_definition_json(def_json, path)) return matches def check_sub(self, value, path): """Check Sub Object""" matches = [] if isinstance(value, list): matches.extend(self.check_value(value[0], path, False)) elif isinstance(value, str): matches.extend(self.check_value(value, path, False)) return matches def match_resource_properties(self, properties, _, path, cfn): """Check CloudFormation Properties""" matches = [] matches.extend( cfn.check_value( obj=properties, key="DefinitionString", path=path[:], check_value=self.check_value, check_sub=self.check_sub, ) ) return matches