AWSTemplateFormatVersion: "2010-09-09" Description: Decoupled Serverless Scheduler To Run HPC Applications At Scale on EC2 - Serverless Scheduler (uksb-1q7ff025u) Transform: - "AWS::Serverless-2016-10-31" Globals: Function: Handler: index.handler MemorySize: 512 Runtime: python3.6 Timeout: 420 Parameters: TimeoutJob: Description: This is the interval (in seconds) to poll SSM for job status where 5s is resonable. If jobs run an hour or more then polling every 60 seconds might be better. Type: String Default: 5 Resources: LambdaRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - "sts:AssumeRole" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" - "arn:aws:iam::aws:policy/AmazonEC2FullAccess" - "arn:aws:iam::aws:policy/AmazonSQSFullAccess" - "arn:aws:iam::aws:policy/AmazonS3FullAccess" - "arn:aws:iam::aws:policy/AmazonSSMFullAccess" - "arn:aws:iam::aws:policy/AmazonSNSFullAccess" - "arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess" - "arn:aws:iam::aws:policy/AmazonDynamoDBFullAccess" AddSQS: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/add_sqs Role: !GetAtt LambdaRole.Arn CheckJob: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/check_job Role: !GetAtt LambdaRole.Arn ConfirmResult: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/confirm_result Role: !GetAtt LambdaRole.Arn DynamoDB: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/dynamo_db Role: !GetAtt LambdaRole.Arn FinalTerminate: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/final_terminate Role: !GetAtt LambdaRole.Arn GetJob: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/get_job Role: !GetAtt LambdaRole.Arn DeleteJob: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/delete_job Role: !GetAtt LambdaRole.Arn ExtractJob: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/extract_job Role: !GetAtt LambdaRole.Arn FailedJob: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/failed_job Role: !GetAtt LambdaRole.Arn JobInputSQS: Type: "AWS::SQS::Queue" Properties: QueueName: !Sub '${AWS::StackName}-job-queue' JobOutputSQS: Type: "AWS::SQS::Queue" Properties: QueueName: !Sub '${AWS::StackName}-job-queue-finished' JobFailedSQS: Type: "AWS::SQS::Queue" Properties: QueueName: !Sub '${AWS::StackName}-job-queue-failed' ProtectEC2: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/protect_ec2 Role: !GetAtt LambdaRole.Arn SQSCheck: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/sqs_check Role: !GetAtt LambdaRole.Arn SQSOut: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/sqs_out Role: !GetAtt LambdaRole.Arn StartJob: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/start_job Role: !GetAtt LambdaRole.Arn WaitEC2: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/wait_ec2 Role: !GetAtt LambdaRole.Arn DynamoTable: Type: "AWS::Serverless::SimpleTable" Properties: PrimaryKey: Name: job_id Type: String ProvisionedThroughput: ReadCapacityUnits: 10 WriteCapacityUnits: 10 SSESpecification: SSEEnabled: false TableName: !Sub '${AWS::StackName}-job-monitoring' Tags: Scheduler: !Sub '${AWS::StackName}' WorkerRole: Type: AWS::IAM::Role Properties: RoleName: !Sub '${AWS::StackName}-ssm-access-role' AssumeRolePolicyDocument: Statement: - Effect: Allow Principal: Service: - ec2.amazonaws.com Action: - sts:AssumeRole Path: / ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AmazonEC2RoleforSSM StateMachineExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: !Sub "states.${AWS::Region}.amazonaws.com" Action: - "sts:AssumeRole" Path: / Policies: - PolicyName: root PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: "lambda:InvokeFunction" Resource: - "Fn::GetAtt": AddSQS.Arn - "Fn::GetAtt": DeleteJob.Arn - "Fn::GetAtt": ExtractJob.Arn - "Fn::GetAtt": FailedJob.Arn - "Fn::GetAtt": WaitEC2.Arn - "Fn::GetAtt": SQSOut.Arn - "Fn::GetAtt": GetJob.Arn - "Fn::GetAtt": ConfirmResult.Arn - "Fn::GetAtt": StartJob.Arn - "Fn::GetAtt": CheckJob.Arn - "Fn::GetAtt": SQSCheck.Arn - "Fn::GetAtt": ProtectEC2.Arn - "Fn::GetAtt": DynamoDB.Arn - "Fn::GetAtt": FinalTerminate.Arn DeployStateMachine: Type: "AWS::StepFunctions::StateMachine" Properties: DefinitionString: !Sub | { "Comment": "The scheduler state machine has a 1to1 mapping with an EC2 instance and runs jobs from SQS queue on instance using SSM run commands on windows EC2 then shut down instance", "StartAt": "WaitEC2", "States": { "WaitEC2": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 3, "MaxAttempts": 3, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "FinalTerminate" } ], "InputPath": "$", "Resource": "${WaitEC2.Arn}", "Next": "SetProtect", "ResultPath": "$.ec2start" }, "SetProtect": { "Type": "Pass", "Result": "True", "ResultPath": "$.protect", "Next": "ProtectEC2" }, "ProtectEC2": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 3, "MaxAttempts": 3, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "YesNoProtected" } ], "Resource": "${ProtectEC2.Arn}", "InputPath": "$", "ResultPath": "$.ProtectEC2", "Next": "YesNoProtected" }, "YesNoProtected": { "Type": "Choice", "Choices": [ { "Variable": "$.ProtectEC2", "StringEquals": "200", "Next": "GetJob" }, { "Variable": "$.input.autoscaling_group", "StringEquals": "nothing", "Next": "GetJob" } ], "Default": "SetUnProtect" }, "GetJob": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "SQSCheck" } ], "Resource": "${GetJob.Arn}", "InputPath": "$", "ResultPath": "$.raw_message", "Next": "DeleteJob" }, "DeleteJob": { "Type": "Task", "Retry" : [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2 }], "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.error-info", "Next": "ExtractJob" }], "Resource": "${DeleteJob.Arn}", "InputPath": "$", "ResultPath": "$.delete_message", "Next": "ExtractJob" }, "ExtractJob": { "Type": "Task", "Retry" : [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2 }], "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.error-info", "Next": "FailedJob" }], "Resource": "${ExtractJob.Arn}", "InputPath": "$.raw_message", "ResultPath": "$.job_details", "Next": "StartJob" }, "StartJob": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "StatusFailed" } ], "Resource": "${StartJob.Arn}", "InputPath": "$", "Next": "StatusStarted", "ResultPath": "$.CommandId" }, "StatusStarted": { "Type": "Pass", "Result": "Started", "ResultPath": "$.status", "Next": "WriteJobStart" }, "WriteJobStart": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "Wait" } ], "Resource": "${DynamoDB.Arn}", "InputPath": "$", "Next": "Wait", "ResultPath": "$.DynamoDB" }, "Wait": { "Type": "Wait", "InputPath": "$", "SecondsPath": "$.input.Timeout_Job", "Next": "CheckJob" }, "CheckJob": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 3, "MaxAttempts": 3, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "YesNoJobRunning" } ], "Resource": "${CheckJob.Arn}", "InputPath": "$", "Next": "YesNoJobRunning", "ResultPath": "$.jobstatus" }, "YesNoJobRunning": { "Type": "Choice", "Choices": [ { "Variable": "$.jobstatus[0]", "StringEquals": "pending", "Next": "Wait" }, { "Variable": "$.jobstatus[0]", "StringEquals": "failed", "Next": "StatusFailed" }, { "Variable": "$.jobstatus[0]", "StringEquals": "success", "Next": "ConfirmResult" } ], "Default": "Wait" }, "ConfirmResult": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 3, "MaxAttempts": 3, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "StatusFailed" } ], "Resource": "${ConfirmResult.Arn}", "InputPath": "$", "ResultPath": "$.ConfirmResultFile", "Next": "YesNoResultFile" }, "YesNoResultFile": { "Type": "Choice", "Choices": [ { "Variable": "$.ConfirmResultFile", "StringEquals": "success", "Next": "StatusSuccessful" }, { "Variable": "$.ConfirmResultFile", "StringEquals": "fail", "Next": "StatusFailed" } ], "Default": "ConfirmResult" }, "StatusFailed": { "Type": "Pass", "Result": "Failed", "ResultPath": "$.status", "Next": "WriteJobFailed" }, "StatusSuccessful": { "Type": "Pass", "Result": "Successful", "ResultPath": "$.status", "Next": "WriteJobSuccessful" }, "WriteJobSuccessful": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "SQSOut" } ], "Resource": "${DynamoDB.Arn}", "InputPath": "$", "Next": "SQSOut", "ResultPath": "$.DynamoDB" }, "WriteJobFailed": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "AddSQS" } ], "Resource": "${DynamoDB.Arn}", "InputPath": "$", "Next": "AddSQS", "ResultPath": "$.DynamoDB" }, "AddSQS": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 3, "MaxAttempts": 3, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "FailedJob" } ], "Resource": "${AddSQS.Arn}", "InputPath": "$", "ResultPath": "$.message", "Next": "YesNoRetryFinished" }, "YesNoRetryFinished": { "Type": "Choice", "Choices": [ { "Variable": "$.message", "StringEquals": "NoRetries", "Next": "FailedJob" } ], "Default": "SQSCheck" }, "SQSOut": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 3, "MaxAttempts": 5, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "FailedJob" } ], "Resource": "${SQSOut.Arn}", "InputPath": "$", "ResultPath": "$.message", "Next": "SQSCheck" }, "FailedJob": { "Type": "Task", "Retry" : [{ "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 3, "BackoffRate": 2 }], "Catch": [{ "ErrorEquals": ["States.ALL"], "ResultPath": "$.error-info", "Next": "SQSFail" }], "Resource": "${FailedJob.Arn}", "InputPath": "$", "ResultPath": "$.message", "Next": "SQSCheck" }, "SQSCheck": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 7, "MaxAttempts": 5, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "SetUnProtect" } ], "Resource": "${SQSCheck.Arn}", "InputPath": "$", "Next": "YesNoSQSEmpty", "ResultPath": "$.SQSstatus" }, "YesNoSQSEmpty": { "Type": "Choice", "Choices": [ { "Variable": "$.SQSstatus", "StringEquals": "Zero", "Next": "SetUnProtect" }, { "Variable": "$.SQSstatus", "StringEquals": "NotZero", "Next": "PassCleanEntry" } ], "Default": "SQSCheck" }, "PassCleanEntry": { "Type": "Pass", "Parameters": { "input.$" : "$.input", "ec2start.$" : "$.ec2start", "protect.$" : "$.protect", "ProtectEC2.$" : "$.ProtectEC2" }, "Next": "GetJob" }, "SetUnProtect": { "Type": "Pass", "Result": "False", "ResultPath": "$.protect", "Next": "UnProtectEC2" }, "UnProtectEC2": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 10, "MaxAttempts": 5, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "States.ALL" ], "ResultPath": "$.errorInfo", "Next": "FinalTerminate" } ], "Resource": "${ProtectEC2.Arn}", "InputPath": "$", "ResultPath": "$.ProtectEC2", "Next": "FinalTerminate" }, "FinalTerminate": { "Type": "Task", "Retry": [ { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 10, "MaxAttempts": 5, "BackoffRate": 2.0 } ], "Catch": [ { "ErrorEquals": [ "ClientError", "TypeError" ], "ResultPath": "$.errorInfo", "Next": "Finish" } ], "Resource": "${FinalTerminate.Arn}", "InputPath": "$", "ResultPath": "$.finalterminate", "Next": "Finish" }, "Finish": { "Type": "Succeed" }, "SQSFail": { "Type": "Fail", "Cause": "SQSAdd did not put job back on queue", "Error": "Error in AddSQS" } } } RoleArn: !GetAtt StateMachineExecutionRole.Arn TriggerStepFunction: Type: "AWS::Serverless::Function" Properties: CodeUri: ./src/trigger_step_function Role: !GetAtt LambdaRole.Arn Environment: Variables: TAGKEY: !Sub '${AWS::StackName}' REGION: !Ref "AWS::Region" SQSINPUTNAME: !Sub '${AWS::StackName}-job-queue' SQSOUTPUTNAME: !Sub '${AWS::StackName}-job-queue-finished' STATEMACHINEARN: !Ref DeployStateMachine STATEMACHINENAME: !GetAtt DeployStateMachine.Name TABLENAME: !Sub '${AWS::StackName}-job-monitoring' TIMEOUTJOB: !Ref TimeoutJob EventRule: Type: "AWS::Events::Rule" Properties: Description: EventRule EventPattern: detail: state: - pending detail-type: - EC2 Instance State-change Notification source: - aws.ec2 State: ENABLED Targets: - Arn: !GetAtt TriggerStepFunction.Arn Id: TargetFunctionV1 PermissionForEventsToInvokeLambda: Type: "AWS::Lambda::Permission" Properties: Action: "lambda:InvokeFunction" FunctionName: !Ref TriggerStepFunction Principal: events.amazonaws.com SourceArn: !GetAtt EventRule.Arn Outputs: TagKey: Description: "EC2 TAG KEY - The tag key you will need to use to associate an EC2 instance with this serverless scheduler, to avoid duplication stack name is used" Value: !Sub '${AWS::StackName}' SQSInputQueue: Description: "The name of the default queue to submit jobs to and needed as EC2 tag value if using default queue." Value: !Sub '${AWS::StackName}-job-queue' DynamoDBTable: Description: "DynamoDB table where you can monitor jobs" Value: !Ref 'DynamoTable' SQSFinishedQueue: Description: "SQS default queue where succesful jobs land" Value: !Ref 'JobOutputSQS' SQSFailedQueue: Description: "SQS default queue where failed jobs land" Value: !Ref 'JobFailedSQS' IAMRoleForEC2Workers: Description: "This IAM role needs to be used for EC2 Workers for SSM to send jobs, add more policies to access other servcies from EC2 such as S3, EFS, or FSx" Value: !Ref 'WorkerRole'