""" Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. SPDX-License-Identifier: MIT-0 """ from cfnlint.rules import CloudFormationLintRule, RuleMatch class RuleScheduleExpression(CloudFormationLintRule): """Validate AWS Events Schedule expression format""" id = "E3027" shortdesc = "Validate AWS Event ScheduleExpression format" description = "Validate the formation of the AWS::Event ScheduleExpression" source_url = "https://docs.aws.amazon.com/AmazonCloudWatch/latest/events/ScheduledEvents.html" tags = ["resources", "events"] def initialize(self, cfn): """Initialize the rule""" self.resource_property_types = ["AWS::Events::Rule"] def check_rate(self, value, path): """Check Rate configuration""" matches = [] # Extract the expression from rate(XXX) rate_expression = value[value.find("(") + 1 : value.find(")")] if not rate_expression: matches.append( RuleMatch(path, "Rate value of ScheduleExpression cannot be empty") ) else: # Rate format: rate(Value Unit) items = rate_expression.split(" ") if len(items) != 2: message = "Rate expression must contain 2 elements (Value Unit), rate contains {} elements" matches.append(RuleMatch(path, message.format(len(items)))) else: # Check the Value if not items[0].isdigit(): message = "Rate Value ({}) should be of type Integer." extra_args = { "actual_type": type(items[0]).__name__, "expected_type": int.__name__, } matches.append( RuleMatch(path, message.format(items[0]), **extra_args) ) return matches def check_cron(self, value, path): """Check Cron configuration""" matches = [] # Extract the expression from cron(XXX) cron_expression = value[value.find("(") + 1 : value.find(")")] if not cron_expression: matches.append( RuleMatch(path, "Cron value of ScheduleExpression cannot be empty") ) else: # Rate format: cron(Minutes Hours Day-of-month Month Day-of-week Year) items = cron_expression.split(" ") if len(items) != 6: message = "Cron expression must contain 6 elements (Minutes Hours Day-of-month Month Day-of-week Year), cron contains {} elements" matches.append(RuleMatch(path, message.format(len(items)))) return matches _, _, day_of_month, _, day_of_week, _ = cron_expression.split(" ") if day_of_month != "?" and day_of_week != "?": matches.append( RuleMatch( path, "Don't specify the Day-of-month and Day-of-week fields in the same cron expression", ) ) return matches def check_value(self, value, path): """Count ScheduledExpression value""" matches = [] # Value is either "cron()" or "rate()" if value.startswith("rate(") and value.endswith(")"): matches.extend(self.check_rate(value, path)) elif value.startswith("cron(") and value.endswith(")"): matches.extend(self.check_cron(value, path)) else: message = "Invalid ScheduledExpression specified ({}). Value has to be either cron() or rate()" matches.append(RuleMatch(path, message.format(value))) return matches def match_resource_properties(self, properties, _, path, cfn): """Check CloudFormation Properties""" matches = [] matches.extend( cfn.check_value( obj=properties, key="ScheduleExpression", path=path[:], check_value=self.check_value, ) ) return matches