AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: Create the initial ML Inference workflow and shadow deployment pipeline Globals: Function: Runtime: python3.8 Handler: handler.lambda_handler Parameters: ModelUrl: Type: String Description: Production Model Artifact S3 URL Default: s3://{{resolve:ssm:code-bucket:1}}/model/model.tar.gz DataUrl: Type: String Description: Test Data S3 URL Default: "s3://{{resolve:ssm:code-bucket:1}}/data/test.csv" WaitSeconds: Type: Number Description: Wait seconds for model performance comparison Default: 300 #3600 Email: Type: String Description: Email address to subscribe for Manuel Approval Notifications Default: example@email.com Mappings: RegionMap: "us-west-1": "XGBoostImage": "632365934929.dkr.ecr.us-west-1.amazonaws.com/xgboost:1" "us-west-2": "XGBoostImage": "433757028032.dkr.ecr.us-west-2.amazonaws.com/xgboost:1" "us-east-1": "XGBoostImage": "811284229777.dkr.ecr.us-east-1.amazonaws.com/xgboost:1" "us-east-2": "XGBoostImage": "825641698319.dkr.ecr.us-east-2.amazonaws.com/xgboost:1" "ap-northeast-1": "XGBoostImage": "501404015308.dkr.ecr.ap-northeast-1.amazonaws.com/xgboost:1" "ap-northeast-2": "XGBoostImage": "306986355934.dkr.ecr.ap-northeast-2.amazonaws.com/xgboost:1" "ap-southeast-1": "XGBoostImage": "475088953585.dkr.ecr.ap-southeast-1.amazonaws.com/xgboost:1" "ap-southeast-2": "XGBoostImage": "544295431143.dkr.ecr.ap-southeast-2.amazonaws.com/xgboost:1" "ap-south-1": "XGBoostImage": "991648021394.dkr.ecr.ap-south-1.amazonaws.com/xgboost:1" "ap-east-1": "XGBoostImage": "286214385809.dkr.ecr.ap-east-1.amazonaws.com/xgboost:1" "ca-central-1": "XGBoostImage": "469771592824.dkr.ecr.ca-central-1.amazonaws.com/xgboost:1" "cn-north-1": "XGBoostImage": "390948362332.dkr.ecr.cn-north-1.amazonaws.com.cn/xgboost:1" "cn-northwest-1": "XGBoostImage": "387376663083.dkr.ecr.cn-northwest-1.amazonaws.com.cn/xgboost:1" "eu-central-1": "XGBoostImage": "813361260812.dkr.ecr.eu-central-1.amazonaws.com/xgboost:1" "eu-north-1": "XGBoostImage": "669576153137.dkr.ecr.eu-north-1.amazonaws.com/xgboost:1" "eu-west-1": "XGBoostImage": "685385470294.dkr.ecr.eu-west-1.amazonaws.com/xgboost:1" "eu-west-2": "XGBoostImage": "644912444149.dkr.ecr.eu-west-2.amazonaws.com/xgboost:1" "eu-west-3": "XGBoostImage": "749696950732.dkr.ecr.eu-west-3.amazonaws.com/xgboost:1" "me-south-1": "XGBoostImage": "249704162688.dkr.ecr.me-south-1.amazonaws.com/xgboost:1" "sa-east-1": "XGBoostImage": "855470959533.dkr.ecr.sa-east-1.amazonaws.com/xgboost:1" "us-gov-west-1": "XGBoostImage": "226302683700.dkr.ecr.us-gov-west-1.amazonaws.com/xgboost:1" Resources: ModelBucket: Type: AWS::S3::Bucket Properties: BucketName: !Join - "-" - - "shadow-deployment-model-registry-bucket" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" TriggerLambda: Type: AWS::Serverless::Function Properties: FunctionName: shadow-deployment-sfn-trigger CodeUri: lambda/sfn_trigger Policies: - Statement: - Effect: Allow Action: states:StartExecution Resource: !Ref ShadowDeploymentSFN Environment: Variables: STATE_MACHINE_ARN: !Ref ShadowDeploymentSFN WAIT_SECONDS: !Ref WaitSeconds Events: S3Event: Type: S3 Properties: Bucket: !Ref ModelBucket Events: s3:ObjectCreated:* Filter: S3Key: Rules: - Name: suffix Value: model.tar.gz ShadowDeploymentSFN: Type: AWS::Serverless::StateMachine Properties: Name: Shadow-Deployment DefinitionUri: state_machine/shadow_deployment.json DefinitionSubstitutions: EndpointUpdateLambda: !GetAtt EndpointUpdateLambda.Arn EndpointStatusCheckLambda: !GetAtt EndpointStatusCheckLambda.Arn ComparisonLambda: !GetAtt ComparisonLambda.Arn TrafficUpdateLambda: !GetAtt TrafficUpdateLambda.Arn ManualApprovalStateMachine: !Ref HumanApprovalLambdaStateMachine XGBoostImage: !FindInMap [RegionMap, !Ref "AWS::Region", "XGBoostImage"] SageMakerExecutionRole: !GetAtt SageMakerRole.Arn Logging: Destinations: - CloudWatchLogsLogGroup: LogGroupArn: !GetAtt StateMachineLogGroup.Arn Policies: - LambdaInvokePolicy: FunctionName: !Ref EndpointUpdateLambda - Statement: - Action: - events:PutTargets - events:PutRule - events:DescribeRule Effect: Allow Resource: '*' - Statement: - Action: - sagemaker:* - lambda:InvokeFunction - iam:PassRole Effect: Allow Resource: '*' - Statement: - Action: - states:* Effect: Allow Resource: !Ref HumanApprovalLambdaStateMachine StateMachineLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: /stepfunctions/shadow-deployment SageMakerRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "sagemaker.amazonaws.com" Action: - sts:AssumeRole Path: "/" ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonSageMakerFullAccess - arn:aws:iam::aws:policy/AmazonS3FullAccess RoleName: SageMakerExecutionRole ResultTable: Type: AWS::DynamoDB::Table Properties: TableName: shadow-deployment-result AttributeDefinitions: - AttributeName: "TimeStamp" AttributeType: "N" - AttributeName: "ModelName" AttributeType: "S" KeySchema: - AttributeName: TimeStamp KeyType: HASH - AttributeName: ModelName KeyType: RANGE BillingMode: PAY_PER_REQUEST EndpointUpdateLambda: Type: AWS::Serverless::Function Properties: FunctionName: shadow-deployment-endpoint-update CodeUri: lambda/endpoint_update Policies: - Statement: - Sid: StateMachine Effect: Allow Action: - ssm:* Resource: "*" Environment: Variables: SHADOW_ENDPOINT_SSM: !Ref ShadowEndpointNameParameter EndpointStatusCheckLambda: Type: AWS::Serverless::Function Properties: FunctionName: shadow-deployment-endpoint-status-check CodeUri: lambda/endpoint_status_check Policies: - Statement: - Sid: StateMachine Effect: Allow Action: - sagemaker:DescribeEndpoint Resource: "*" ShadowEndpointNameParameter: Type: AWS::SSM::Parameter Properties: Name: endpoint-name-shadow Type: String Value: None ComparisonLambda: Type: AWS::Serverless::Function Properties: FunctionName: shadow-deployment-performance-comparison CodeUri: lambda/performance_comparison Policies: - Statement: - Effect: Allow Action: - dynamodb:* Resource: !GetAtt ResultTable.Arn - Effect: Allow Action: - ssm:* Resource: "*" Environment: Variables: SHADOW_ENDPOINT_SSM: !Ref ShadowEndpointNameParameter PROD_ENDPOINT_SSM: !Ref ProdEndpointNameParameter RESULT_TABLE_DDB: !Ref ResultTable TrafficUpdateLambda: Type: AWS::Serverless::Function Properties: FunctionName: shadow-deployment-traffic-update CodeUri: lambda/traffic_update Policies: - Statement: - Effect: Allow Action: - ssm:* - sagemaker:* Resource: "*" Environment: Variables: SHADOW_ENDPOINT_SSM: !Ref ShadowEndpointNameParameter PROD_ENDPOINT_SSM: !Ref ProdEndpointNameParameter ProdEndpoint: Type: AWS::SageMaker::Endpoint Properties: EndpointConfigName: !GetAtt ProdEndpointConfig.EndpointConfigName EndpointName: xgboost-production ProdEndpointConfig: Type: AWS::SageMaker::EndpointConfig Properties: ProductionVariants: - InitialInstanceCount: 1 InitialVariantWeight: 1.0 InstanceType: ml.m4.xlarge ModelName: !GetAtt ProdModel.ModelName VariantName: !GetAtt ProdModel.ModelName ProdModel: Type: AWS::SageMaker::Model Properties: ExecutionRoleArn: !GetAtt SageMakerRole.Arn ModelName: xgboost-production PrimaryContainer: Image: !FindInMap [RegionMap, !Ref "AWS::Region", "XGBoostImage"] ModelDataUrl: !Ref ModelUrl ProdEndpointNameParameter: Type: AWS::SSM::Parameter Properties: Name: endpoint-name-prod Type: String Value: !GetAtt ProdEndpoint.EndpointName TrafficSqsQueue: Type: AWS::SQS::Queue TrafficSqsQueuePolicy: Type: AWS::SQS::QueuePolicy Properties: PolicyDocument: Statement: - Effect: Allow Principal: AWS: !Sub arn:aws:iam::${AWS::AccountId}:root Action: SQS:* Resource: !GetAtt TrafficSqsQueue.Arn Queues: - !Ref TrafficSqsQueue ProcessorFunction: Type: AWS::Serverless::Function Properties: FunctionName: shadow-deployment-message-processor CodeUri: lambda/processor Policies: - Statement: - Effect: Allow Action: - dynamodb:* Resource: !GetAtt ResultTable.Arn - Effect: Allow Action: - ssm:* - sagemaker:* Resource: "*" Environment: Variables: SHADOW_ENDPOINT_SSM: !Ref ShadowEndpointNameParameter PROD_ENDPOINT_SSM: !Ref ProdEndpointNameParameter RESULT_TABLE_DDB: !Ref ResultTable Events: SQSMessage: Type: SQS Properties: Queue: !GetAtt TrafficSqsQueue.Arn BatchSize: 10 TrafficSimulatorFunction: Type: AWS::Serverless::Function Properties: FunctionName: shadow-deployment-traffic-simulator CodeUri: lambda/traffic_simulator Policies: - Statement: - Effect: Allow Action: - s3:* Resource: "*" #!GetAtt ModelBucket.Arn - Effect: Allow Action: - sqs:SendMessage Resource: !GetAtt TrafficSqsQueue.Arn Environment: Variables: QUEUE_URL: !Ref TrafficSqsQueue DATA_URL: !Ref DataUrl Events: PerMinute: Type: Schedule Properties: Schedule: 'rate(1 minute)' ExecutionApi: Type: "AWS::ApiGateway::RestApi" Properties: Name: "Human approval endpoint" Description: "HTTP Endpoint backed by API Gateway and Lambda" FailOnWarnings: true ExecutionResource: Type: 'AWS::ApiGateway::Resource' Properties: RestApiId: !Ref ExecutionApi ParentId: !GetAtt "ExecutionApi.RootResourceId" PathPart: execution ExecutionMethod: Type: "AWS::ApiGateway::Method" Properties: AuthorizationType: NONE HttpMethod: GET Integration: Type: AWS IntegrationHttpMethod: POST Uri: !Sub "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${LambdaApprovalFunction.Arn}/invocations" IntegrationResponses: - StatusCode: '302' ResponseParameters: method.response.header.Location: "integration.response.body.headers.Location" RequestTemplates: application/json: | { "body" : $input.json('$'), "headers": { #foreach($header in $input.params().header.keySet()) "$header": "$util.escapeJavaScript($input.params().header.get($header))" #if($foreach.hasNext),#end #end }, "method": "$context.httpMethod", "params": { #foreach($param in $input.params().path.keySet()) "$param": "$util.escapeJavaScript($input.params().path.get($param))" #if($foreach.hasNext),#end #end }, "query": { #foreach($queryParam in $input.params().querystring.keySet()) "$queryParam": "$util.escapeJavaScript($input.params().querystring.get($queryParam))" #if($foreach.hasNext),#end #end } } ResourceId: !Ref ExecutionResource RestApiId: !Ref ExecutionApi MethodResponses: - StatusCode: "302" ResponseParameters: method.response.header.Location: true ApiGatewayAccount: Type: 'AWS::ApiGateway::Account' Properties: CloudWatchRoleArn: !GetAtt "ApiGatewayCloudWatchLogsRole.Arn" ApiGatewayCloudWatchLogsRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - apigateway.amazonaws.com Action: - 'sts:AssumeRole' Policies: - PolicyName: ApiGatewayLogsPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - "logs:*" Resource: !Sub "arn:${AWS::Partition}:logs:*:*:*" ExecutionApiStage: DependsOn: - ApiGatewayAccount Type: 'AWS::ApiGateway::Stage' Properties: DeploymentId: !Ref ApiDeployment MethodSettings: - DataTraceEnabled: true HttpMethod: '*' LoggingLevel: INFO ResourcePath: /* RestApiId: !Ref ExecutionApi StageName: states ApiDeployment: Type: "AWS::ApiGateway::Deployment" DependsOn: - ExecutionMethod Properties: RestApiId: !Ref ExecutionApi StageName: DummyStage LambdaApprovalFunction: Type: 'AWS::Lambda::Function' Properties: Code: ZipFile: Fn::Sub: | const AWS = require('aws-sdk'); var redirectToStepFunctions = function(lambdaArn, statemachineName, executionName, callback) { const lambdaArnTokens = lambdaArn.split(":"); const partition = lambdaArnTokens[1]; const region = lambdaArnTokens[3]; const accountId = lambdaArnTokens[4]; console.log("partition=" + partition); console.log("region=" + region); console.log("accountId=" + accountId); const executionArn = "arn:" + partition + ":states:" + region + ":" + accountId + ":execution:" + statemachineName + ":" + executionName; console.log("executionArn=" + executionArn); const url = "https://console.aws.amazon.com/states/home?region=" + region + "#/executions/details/" + executionArn; callback(null, { statusCode: 302, headers: { Location: url } }); }; exports.handler = (event, context, callback) => { console.log('Event= ' + JSON.stringify(event)); const action = event.query.action; const taskToken = event.query.taskToken; const statemachineName = event.query.sm; const executionName = event.query.ex; const stepfunctions = new AWS.StepFunctions(); var message = ""; if (action === "approve") { message = { "is_shadow_selected": true }; } else if (action === "reject") { message = { "is_shadow_selected": false }; } else { console.error("Unrecognized action. Expected: approve, reject."); callback({"Status": "Failed to process the request. Unrecognized Action."}); } stepfunctions.sendTaskSuccess({ output: JSON.stringify(message), taskToken: event.query.taskToken }) .promise() .then(function(data) { redirectToStepFunctions(context.invokedFunctionArn, statemachineName, executionName, callback); }).catch(function(err) { console.error(err, err.stack); callback(err); }); } Description: Lambda function that callback to AWS Step Functions FunctionName: LambdaApprovalFunction Handler: index.handler Role: !GetAtt "LambdaApiGatewayIAMRole.Arn" Runtime: nodejs12.x LambdaApiGatewayInvoke: Type: "AWS::Lambda::Permission" Properties: Action: "lambda:InvokeFunction" FunctionName: !GetAtt "LambdaApprovalFunction.Arn" Principal: "apigateway.amazonaws.com" SourceArn: !Sub "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${ExecutionApi}/*" LambdaApiGatewayIAMRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Action: - "sts:AssumeRole" Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Policies: - PolicyName: CloudWatchLogsPolicy PolicyDocument: Statement: - Effect: Allow Action: - "logs:*" Resource: !Sub "arn:${AWS::Partition}:logs:*:*:*" - PolicyName: StepFunctionsPolicy PolicyDocument: Statement: - Effect: Allow Action: - "states:SendTaskFailure" - "states:SendTaskSuccess" Resource: "*" HumanApprovalLambdaStateMachine: Type: AWS::StepFunctions::StateMachine Properties: RoleArn: !GetAtt LambdaStateMachineExecutionRole.Arn DefinitionString: Fn::Sub: | { "StartAt": "Lambda Callback", "TimeoutSeconds": 3600, "States": { "Lambda Callback": { "Type": "Task", "Resource": "arn:${AWS::Partition}:states:::lambda:invoke.waitForTaskToken", "Parameters": { "FunctionName": "${LambdaHumanApprovalSendEmailFunction.Arn}", "Payload": { "ExecutionContext.$": "$$", "APIGatewayEndpoint": "https://${ExecutionApi}.execute-api.${AWS::Region}.amazonaws.com/states", "performance_comparison.$": "$.performance_comparison" } }, "End": true } } } SNSHumanApprovalEmailTopic: Type: AWS::SNS::Topic Properties: Subscription: - Endpoint: !Sub ${Email} Protocol: email LambdaHumanApprovalSendEmailFunction: Type: "AWS::Lambda::Function" Properties: Handler: "index.lambda_handler" Role: !GetAtt LambdaSendEmailExecutionRole.Arn Runtime: "nodejs12.x" Code: ZipFile: Fn::Sub: | console.log('Loading function'); const AWS = require('aws-sdk'); exports.lambda_handler = (event, context, callback) => { let shadow_score = event["performance_comparison"]["shadow_score"]; let prod_score = event["performance_comparison"]["prod_score"]; console.log('event= ' + JSON.stringify(event)); console.log('context= ' + JSON.stringify(context)); const executionContext = event.ExecutionContext; console.log('executionContext= ' + executionContext); const executionName = executionContext.Execution.Name; console.log('executionName= ' + executionName); const statemachineName = executionContext.StateMachine.Name; console.log('statemachineName= ' + statemachineName); const taskToken = executionContext.Task.Token; console.log('taskToken= ' + taskToken); const apigwEndpint = event.APIGatewayEndpoint; console.log('apigwEndpint = ' + apigwEndpint) const approveEndpoint = apigwEndpint + "/execution?action=approve&ex=" + executionName + "&sm=" + statemachineName + "&taskToken=" + encodeURIComponent(taskToken); console.log('approveEndpoint= ' + approveEndpoint); const rejectEndpoint = apigwEndpint + "/execution?action=reject&ex=" + executionName + "&sm=" + statemachineName + "&taskToken=" + encodeURIComponent(taskToken); console.log('rejectEndpoint= ' + rejectEndpoint); const emailSnsTopic = "${SNSHumanApprovalEmailTopic}"; console.log('emailSnsTopic= ' + emailSnsTopic); var emailMessage = 'Welcome! \n\n'; emailMessage += 'This is an email requiring an approval for a step functions execution. \n\n' emailMessage += 'Please check the following information and click "Shadow Model" or "Production Model" link if you want to host it in production environment. \n\n' emailMessage += '\n\n' emailMessage += '\n\n' emailMessage += 'Shadow Model Score = ' + shadow_score + '\n\n' emailMessage += 'Production Model Score = ' + prod_score + '\n\n' emailMessage += '\n\n' emailMessage += 'Shadow Model ' + approveEndpoint + '\n\n' emailMessage += '\n\n' emailMessage += 'Production Model ' + rejectEndpoint + '\n\n' emailMessage += 'Thanks for your time!' const sns = new AWS.SNS(); var params = { Message: emailMessage, Subject: "Required approval from AWS Step Functions", TopicArn: emailSnsTopic }; sns.publish(params) .promise() .then(function(data) { console.log("MessageID is " + data.MessageId); callback(null); }).catch( function(err) { console.error(err, err.stack); callback(err); }); } LambdaStateMachineExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: states.amazonaws.com Action: "sts:AssumeRole" Policies: - PolicyName: InvokeCallbackLambda PolicyDocument: Statement: - Effect: Allow Action: - "lambda:InvokeFunction" Resource: - !Sub "${LambdaHumanApprovalSendEmailFunction.Arn}" LambdaSendEmailExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: "sts:AssumeRole" Policies: - PolicyName: CloudWatchLogsPolicy PolicyDocument: Statement: - Effect: Allow Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:${AWS::Partition}:logs:*:*:*" - PolicyName: SNSSendEmailPolicy PolicyDocument: Statement: - Effect: Allow Action: - "SNS:Publish" Resource: - !Sub "${SNSHumanApprovalEmailTopic}" Outputs: ModelBucketName: Description: S3 Bucket to upload new models Value: !Ref ModelBucket ManuelApprovalTopicName: Description: Manuel Approval SNS Topic Value: !GetAtt SNSHumanApprovalEmailTopic.TopicName ApiGatewayInvokeURL: Value: !Sub "https://${ExecutionApi}.execute-api.${AWS::Region}.amazonaws.com/states"