AWSTemplateFormatVersion: "2010-09-09" Description: "AWS Step Functions Human based task example. It sends an email with an HTTP URL for approval." Parameters: Email: Type: String AllowedPattern: "^[a-zA-Z0-9_.+-]+@[a-zA-Z0-9-]+\\.[a-zA-Z0-9-.]+$" ConstraintDescription: Must be a valid email address. ContainerImage: Type: String s3Bucket: Type: String SageMakerExecutionRoleArn: Type: String Resources: # Begin API Gateway Resources ExecutionApi: Type: AWS::ApiGateway::RestApi Properties: Name: "Human approval endpoint" Description: "HTTP Endpoint backed by API Gateway and Lambda" FailOnWarnings: true Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo ExecutionResource: Type: AWS::ApiGateway::Resource Properties: RestApiId: !Ref ExecutionApi ParentId: !GetAtt "ExecutionApi.RootResourceId" PathPart: execution ExecutionMethod: Type: AWS::ApiGateway::Method Properties: AuthorizationType: NONE HttpMethod: GET Integration: Type: AWS IntegrationHttpMethod: POST Uri: !Sub "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaApprovalFunction.Arn}/invocations" IntegrationResponses: - StatusCode: 302 ResponseParameters: method.response.header.Location: "integration.response.body.headers.Location" RequestTemplates: application/json: | { "body" : $input.json('$'), "headers": { #foreach($header in $input.params().header.keySet()) "$header": "$util.escapeJavaScript($input.params().header.get($header))" #if($foreach.hasNext),#end #end }, "method": "$context.httpMethod", "params": { #foreach($param in $input.params().path.keySet()) "$param": "$util.escapeJavaScript($input.params().path.get($param))" #if($foreach.hasNext),#end #end }, "query": { #foreach($queryParam in $input.params().querystring.keySet()) "$queryParam": "$util.escapeJavaScript($input.params().querystring.get($queryParam))" #if($foreach.hasNext),#end #end } } ResourceId: !Ref ExecutionResource RestApiId: !Ref ExecutionApi MethodResponses: - StatusCode: 302 ResponseParameters: method.response.header.Location: true ApiGatewayAccount: Type: AWS::ApiGateway::Account Properties: CloudWatchRoleArn: !GetAtt "ApiGatewayCloudWatchLogsRole.Arn" ApiGatewayCloudWatchLogsRole: Type: AWS::IAM::Role Properties: Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - Action: - 'sts:AssumeRole' Policies: - PolicyName: ApiGatewayLogsPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "logs:*" Resource: !Sub "arn:${AWS::Partition}:logs:*:*:*" ExecutionApiStage: DependsOn: - ApiGatewayAccount Type: AWS::ApiGateway::Stage Properties: DeploymentId: !Ref ApiDeployment MethodSettings: - DataTraceEnabled: true HttpMethod: '*' LoggingLevel: INFO ResourcePath: /* RestApiId: !Ref ExecutionApi StageName: states Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo ApiDeployment: Type: AWS::ApiGateway::Deployment DependsOn: - ExecutionMethod Properties: RestApiId: !Ref ExecutionApi StageName: DummyStage # End API Gateway Resources # Begin # Lambda that will be invoked by API Gateway LambdaApprovalFunction: Type: AWS::Lambda::Function Properties: Code: ZipFile: Fn::Sub: | const AWS = require('aws-sdk'); var redirectToStepFunctions = function(lambdaArn, statemachineName, executionName, callback) { const lambdaArnTokens = lambdaArn.split(":"); const partition = lambdaArnTokens[1]; const region = lambdaArnTokens[3]; const accountId = lambdaArnTokens[4]; console.log("partition=" + partition); console.log("region=" + region); console.log("accountId=" + accountId); const executionArn = "arn:" + partition + ":states:" + region + ":" + accountId + ":execution:" + statemachineName + ":" + executionName; console.log("executionArn=" + executionArn); const url = "" + region + "#/executions/details/" + executionArn; callback(null, { statusCode: 302, headers: { Location: url } }); }; exports.handler = (event, context, callback) => { console.log('Event= ' + JSON.stringify(event)); const action = event.query.action; const taskToken = event.query.taskToken; const statemachineName =; const executionName = event.query.ex; const sagemakerTrainingJobName = event.query.trainingJobName; const modelArtifacts = event.query.modelArtifacts; const stepfunctions = new AWS.StepFunctions(); var message = ""; if (action === "approve") { message = { "Status": "Approved! Model approved by ${Email}", "TrainingJobName": sagemakerTrainingJobName, "ModelArtifacts": modelArtifacts }; } else if (action === "reject") { message = { "Status": "Rejected! Model rejected by ${Email}" }; } else { console.error("Unrecognized action. Expected: approve, reject."); callback({"Status": "Failed to process the request. Unrecognized Action."}); } stepfunctions.sendTaskSuccess({ output: JSON.stringify(message), taskToken: event.query.taskToken }) .promise() .then(function(data) { redirectToStepFunctions(context.invokedFunctionArn, statemachineName, executionName, callback); }).catch(function(err) { console.error(err, err.stack); callback(err); }); } Description: Lambda function that callback to AWS Step Functions Handler: index.handler Role: !GetAtt "LambdaApiGatewayIAMRole.Arn" Runtime: nodejs12.x Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo LambdaApiGatewayInvoke: Type: AWS::Lambda::Permission Properties: Action: "lambda:InvokeFunction" FunctionName: !GetAtt "LambdaApprovalFunction.Arn" Principal: "" SourceArn: !Sub "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${ExecutionApi}/*" LambdaApiGatewayIAMRole: Type: AWS::IAM::Role Properties: Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Action: - "sts:AssumeRole" Effect: "Allow" Principal: Service: - "" Policies: - PolicyName: CloudWatchLogsPolicy PolicyDocument: Statement: - Effect: Allow Action: - "logs:*" Resource: !Sub "arn:${AWS::Partition}:logs:*:*:*" - PolicyName: StepFunctionsPolicy PolicyDocument: Statement: - Effect: Allow Action: - "states:SendTaskFailure" - "states:SendTaskSuccess" Resource: "*" # End Lambda that will be invoked by API Gateway # Begin state machine that publishes to Lambda and sends an email with the link for approval HumanApprovalLambdaStateMachine: DependsOn: - LambdaApprovalFunction - LambdaHumanApprovalSendEmailFunction Type: AWS::StepFunctions::StateMachine Properties: Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo RoleArn: !GetAtt LambdaStateMachineExecutionRole.Arn DefinitionString: Fn::Sub: | { "StartAt": "Train model (r-fable-forecasting)", "States": { "Train model (r-fable-forecasting)": { "Type": "Task", "Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync", "Parameters": { "AlgorithmSpecification": { "TrainingImage": "${ContainerImage}", "TrainingInputMode": "File" }, "OutputDataConfig": { "S3OutputPath": "s3://${s3Bucket}/output/r-fable-trip-forecasting" }, "StoppingCondition": { "MaxRuntimeInSeconds": 3600 }, "ResourceConfig": { "InstanceCount": 1, "InstanceType": "ml.m4.xlarge", "VolumeSizeInGB": 5 }, "RoleArn": "${SageMakerExecutionRoleArn}", "InputDataConfig":[ { "ChannelName": "train", "DataSource":{ "S3DataSource":{ "S3DataType": "S3Prefix", "S3Uri.$": "States.Format('s3://{}/{}', $.detail.requestParameters.bucketName, $.detail.requestParameters.key)", "S3DataDistributionType":"FullyReplicated" } } } ], "HyperParameters": { "city.$": "$.city", "ic": "aic", "ets_trend_method": "A" }, "TrainingJobName.$": "States.Format('r-fable-{}-{}', $.city, $.eventID)" }, "Catch":[ { "ErrorEquals":[ "States.ALL" ], "ResultPath":"$.cause", "Next":"Workflow Error" } ], "Next": "Evaluate model" }, "Evaluate model": { "Type": "Task", "Resource": "arn:aws:states:::sagemaker:createProcessingJob.sync", "Parameters": { "AppSpecification": { "ImageUri": "${ContainerImage}", "ContainerEntrypoint": ["/usr/bin/Rscript", "/opt/fable_sagemaker.r", "evaluate"], "ContainerArguments.$": "States.Array($" }, "ProcessingInputs": [ { "InputName": "model-input", "S3Input": { "LocalPath": "/opt/ml/processing/input", "S3DataDistributionType": "FullyReplicated", "S3DataType": "S3Prefix", "S3InputMode": "File", "S3Uri.$": "$.ModelArtifacts.S3ModelArtifacts" } } ], "ProcessingOutputConfig": { "Outputs": [ { "OutputName": "evaluation-output", "S3Output": { "LocalPath": "/opt/ml/processing/output", "S3UploadMode": "EndOfJob", "S3Uri.$": "States.Format('s3://${s3Bucket}/output/r-fable-trip-forecasting/{}', $.TrainingJobName)" } } ] }, "ProcessingResources": { "ClusterConfig": { "InstanceCount": 1, "InstanceType": "ml.t3.large", "VolumeSizeInGB": 5 } }, "RoleArn": "${SageMakerExecutionRoleArn}", "ProcessingJobName.$": "$.TrainingJobName" }, "Catch":[ { "ErrorEquals":[ "States.ALL" ], "ResultPath":"$.cause", "Next":"Workflow Error" } ], "Next": "Send email for approval" }, "Workflow Error": { "Type": "Fail", "Error": "ErrorCode", "Cause": "Caused By Message" }, "Send email for approval":{ "Type": "Task", "Resource": "arn:${AWS::Partition}:states:::lambda:invoke.waitForTaskToken", "Parameters": { "FunctionName": "${LambdaHumanApprovalSendEmailFunction.Arn}", "Payload": { "EvaluationReport.$": "States.Format('{}/forecast-report.png', $.ProcessingOutputConfig.Outputs[0].S3Output.S3Uri)", "EvaluationReportBucket": "${s3Bucket}", "EvaluationReportKey.$": "States.Format('output/r-fable-trip-forecasting/{}/forecast-report.png', $.ProcessingJobName)", "TrainingJobName.$": "$.ProcessingJobName", "ModelArtifacts.$": "$.ProcessingInputs[0].S3Input.S3Uri", "ExecutionContext.$": "$$", "APIGatewayEndpoint": "https://${ExecutionApi}.execute-api.${AWS::Region}" } }, "Catch":[ { "ErrorEquals":[ "States.ALL" ], "ResultPath":"$.cause", "Next":"Workflow Error" } ], "Next": "ManualApprovalChoiceState" }, "ManualApprovalChoiceState": { "Type": "Choice", "Choices": [ { "Variable": "$.Status", "StringEquals": "Approved! Model approved by ${Email}", "Next": "Approved Model" }, { "Variable": "$.Status", "StringEquals": "Rejected! Model rejected by ${Email}", "Next": "Rejected Model" } ], "Default": "Rejected Model" }, "Approved Model": { "Type": "Pass", "Next": "Save Model" }, "Rejected Model": { "Type": "Pass", "End": true }, "Save Model": { "Parameters": { "PrimaryContainer": { "Image": "${ContainerImage}", "Environment": {}, "ModelDataUrl.$": "$.ModelArtifacts" }, "ExecutionRoleArn": "${SageMakerExecutionRoleArn}", "ModelName.$": "$.TrainingJobName" }, "Resource": "arn:aws:states:::sagemaker:createModel", "Type": "Task", "End": true } } } LambdaHumanApprovalSendEmailFunction: Type: AWS::Lambda::Function Properties: Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo Handler: "index.lambda_handler" Role: !GetAtt LambdaSendEmailExecutionRole.Arn Runtime: "nodejs12.x" Timeout: "25" Code: ZipFile: Fn::Sub: | const AWS = require('aws-sdk'); AWS.config.update({signatureVersion: 'v4'}); exports.lambda_handler = (event, context, callback) => { console.log('event= ' + JSON.stringify(event)); console.log('context= ' + JSON.stringify(context)); const executionContext = event.ExecutionContext; const executionName = executionContext.Execution.Name; const sagemakerTraingingJob = event.TrainingJobName; const modelArtifacts = event.ModelArtifacts; const evaluationReportBucket = event.EvaluationReportBucket; const evaluationReportKey = event.EvaluationReportKey; const statemachineName = executionContext.StateMachine.Name; const taskToken = executionContext.Task.Token; const apigwEndpint = event.APIGatewayEndpoint; const approveEndpointUrl = apigwEndpint + "/execution?action=approve&ex=" + executionName + "&sm=" + statemachineName + "&trainingJobName=" + sagemakerTraingingJob + "&modelArtifacts=" + encodeURIComponent(modelArtifacts) + "&taskToken=" + encodeURIComponent(taskToken); console.log('approveEndpoint= ' + approveEndpointUrl); const rejectEndpointUrl = apigwEndpint + "/execution?action=reject&ex=" + executionName + "&sm=" + statemachineName + "&taskToken=" + encodeURIComponent(taskToken); console.log('rejectEndpoint= ' + rejectEndpointUrl); // presign the report png for review // var s3 = new AWS.S3(); var s3params = {Bucket: evaluationReportBucket, Key: evaluationReportKey, Expires: 86400}; var evaluationReportUrl = s3.getSignedUrl('getObject', s3params); console.log('The Evaluation Report URL is', evaluationReportUrl); var emailMessageHtml = '


'; const ses = new AWS.SES({apiVersion: '2010-12-01'}); var params = { Destination: {ToAddresses:["${Email}"]}, Message: {Body: {Html: {Charset:'UTF-8', Data: emailMessageHtml}}, Subject: {Charset: 'UTF-8', Data: 'Required approval from AWS Step Functions'}}, Source: "${Email}" }; ses.sendEmail(params) .promise() .then(function(data) { console.log("MessageID is " + data.MessageId); callback(null); }).catch( function(err) { console.error(err, err.stack); callback(err); }); }; LambdaVerifySESEmail: Type: AWS::Lambda::Function Properties: Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo Handler: "index.lambda_handler" Role: !GetAtt LambdaSendEmailExecutionRole.Arn Runtime: "nodejs12.x" Timeout: "25" Code: ZipFile: Fn::Sub: | var AWS = require('aws-sdk'); var response = require('cfn-response'); exports.lambda_handler = (event, context, callback) => { // For Delete requests, immediately send a SUCCESS response. if (event.RequestType == "Delete") { response.send(event, context, "SUCCESS"); return; } var responseStatus = "FAILED"; var responseData = {}; const ses = new AWS.SES({apiVersion: '2010-12-01'}); var params = {EmailAddress: "${Email}"}; ses.verifyEmailIdentity(params, function(err, data) { if (err) { console.log(err, err.stack); responseData = {Error: "SES email verification failed"}; } // an error occurred else { console.log(data); responseStatus = "SUCCESS"; } // successful response response.send(event, context, responseStatus, responseData); }); }; LambdaCustomExecution: Type: AWS::CloudFormation::CustomResource Version: "1.0" Properties: ServiceToken: !GetAtt LambdaVerifySESEmail.Arn LambdaStateMachineExecutionRole: Type: AWS::IAM::Role Properties: Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: Action: "sts:AssumeRole" Policies: - PolicyName: XRayAccessPolicy PolicyDocument: Statement: - Effect: Allow Action: - "xray:PutTraceSegments" - "xray:PutTelemetryRecords" - "xray:GetSamplingRules" - "xray:GetSamplingTargets" Resource: - "*" - PolicyName: InvokeCallbackLambda PolicyDocument: Statement: - Effect: Allow Action: - "lambda:InvokeFunction" Resource: - !Sub "${LambdaHumanApprovalSendEmailFunction.Arn}" - !Sub "${LambdaHumanApprovalSendEmailFunction.Arn}:*" - PolicyName: CloudWatchLogsDelivery PolicyDocument: Statement: - Effect: Allow Action: - "logs:CreateLogDelivery" - "logs:GetLogDelivery" - "logs:UpdateLogDelivery" - "logs:DeleteLogDelivery" - "logs:ListLogDeliveries" - "logs:PutResourcePolicy" - "logs:DescribeResourcePolicies" - "logs:DescribeLogGroups" Resource: - !Sub "arn:${AWS::Partition}:logs:*:*:*" - PolicyName: InvokeSageMakerJobs PolicyDocument: Statement: - Effect: Allow Action: - "sagemaker:CreateProcessingJob" - "sagemaker:DescribeProcessingJob" - "sagemaker:StopProcessingJob" Resource: - !Sub "arn:aws:sagemaker:${AWS::Region}:${AWS::AccountId}:processing-job/*" - Effect: Allow Action: - "sagemaker:CreateTrainingJob" - "sagemaker:DescribeTrainingJob" - "sagemaker:StopTrainingJob" Resource: - !Sub "arn:aws:sagemaker:${AWS::Region}:${AWS::AccountId}:training-job/*" - Effect: Allow Action: - "sagemaker:CreateModel" Resource: - !Sub "arn:aws:sagemaker:${AWS::Region}:${AWS::AccountId}:model/*" - Effect: Allow Action: - "events:PutTargets" - "events:PutRule" - "events:DescribeRule" Resource: - !Sub "arn:aws:events:${AWS::Region}:${AWS::AccountId}:rule/StepFunctionsGetEventsForSageMakerProcessingJobsRule" - !Sub "arn:aws:events:${AWS::Region}:${AWS::AccountId}:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule" - Effect: Allow Action: - "sagemaker:ListTags" Resource: - "*" - Effect: Allow Action: - "iam:PassRole" Resource: - !Ref SageMakerExecutionRoleArn Condition: StringEquals: iam:PassedToService: "" LambdaSendEmailExecutionRole: Type: AWS::IAM::Role Properties: Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: Action: "sts:AssumeRole" Policies: - PolicyName: CloudWatchLogsPolicy PolicyDocument: Statement: - Effect: Allow Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:${AWS::Partition}:logs:*:*:*" - PolicyName: S3BucketPolicy PolicyDocument: Statement: - Effect: Allow Action: - "s3:GetObject" Resource: - !Sub "arn:aws:s3:::${s3Bucket}/*" - PolicyName: SESSendEmailPolicy PolicyDocument: Statement: - Effect: Allow Action: - "ses:SendEmail" - "ses:SendRawEmail" Resource: - !Sub "arn:aws:ses:${AWS::Region}:${AWS::AccountId}:identity/${Email}" - Effect: Allow Action: - "ses:VerifyEmailIdentity" Resource: - "*" EventBridgeInvokeSfnRole: Type: AWS::IAM::Role Properties: Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: Action: "sts:AssumeRole" Policies: - PolicyName: EventBridgeInvokeSfnRole PolicyDocument: Statement: - Effect: Allow Action: - "states:StartExecution" Resource: - !Sub "${HumanApprovalLambdaStateMachine}" EventRule: Type: AWS::Events::Rule Properties: Description: "training triggered by s3 object put." EventPattern: source: - "aws.s3" detail-type: - "AWS API Call via CloudTrail" detail: eventSource: - "" eventName: - "PutObject" requestParameters: bucketName: - !Ref s3Bucket key: - prefix: "r-fable-trip-forecasting/new-data" State: "ENABLED" Targets: - Arn: Fn::GetAtt: - "HumanApprovalLambdaStateMachine" - "Arn" Id: "HumanApprovalLambdaStateMachine" InputTransformer: InputPathsMap: detail: "$.detail" eventID: "$.id" resources: "$.resources" InputTemplate: '{"Comment": "Executed from EventBridge", "city": "Melbourne", "eventID": , "resources": , "detail":}' RoleArn: !Sub "${EventBridgeInvokeSfnRole.Arn}" S3CloudTrailBucket: Type: AWS::S3::Bucket Properties: Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo S3CloudTrailBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: Ref: S3CloudTrailBucket PolicyDocument: Version: "2012-10-17" Statement: - Sid: "AWSCloudTrailAclCheck" Effect: "Allow" Principal: Service: "" Action: "s3:GetBucketAcl" Resource: !Sub "arn:aws:s3:::${S3CloudTrailBucket}" - Sid: "AWSCloudTrailWrite" Effect: "Allow" Principal: Service: "" Action: "s3:PutObject" Resource: !Sub "arn:aws:s3:::${S3CloudTrailBucket}/AWSLogs/${AWS::AccountId}/*" Condition: StringEquals: s3:x-amz-acl: "bucket-owner-full-control" CloudTrailS3: DependsOn: S3CloudTrailBucketPolicy Type: AWS::CloudTrail::Trail Properties: EventSelectors: - DataResources: - Type: AWS::S3::Object Values: - !Sub "arn:aws:s3:::${s3Bucket}/" IncludeManagementEvents: false ReadWriteType: WriteOnly IncludeGlobalServiceEvents: true IsLogging: true IsMultiRegionTrail: true S3BucketName: Ref: S3CloudTrailBucket Tags: - Key: CreatedFor Value: reInvent2020 AIM404 Demo # End state machine that publishes to Lambda and sends an email with the link for approval Outputs: ApiGatewayInvokeURL: Value: !Sub "https://${ExecutionApi}.execute-api.${AWS::Region}" StateMachineHumanApprovalArn: Value: !Ref HumanApprovalLambdaStateMachine