AWSTemplateFormatVersion: '2010-09-09' Description: CloudFormation template to deploy a sample CI/CD Pipeline for ML Metadata: 'AWS::CloudFormation::Interface': ParameterGroups: - Label: default: ML Pipeline Parameters Parameters: - pMLPipelineName ParameterLabels: pMLPipelineName: default: Step Function ML Pipeline Name Parameters: pCICDStack: AllowedPattern: '[A-Za-z0-9-/.]{1,50}' Description: Name of your CICD CloudFormation stack to cross reference in this nested stack. MaxLength: '50' MinLength: '1' Type: String pMLPipelineName: AllowedPattern: '[A-Za-z0-9-]{1,63}' ConstraintDescription: >- Maximum of 63 alphanumeric characters. Can include hyphens (-), but not spaces. Must be unique within your account in an AWS Region. Description: Name for your StepFunction ML pipeline MaxLength: '63' MinLength: '1' Type: String Default: sample-ml-pipeline Resources: StagingBucketAccessPolicy: Type: 'AWS::IAM::Policy' Properties: PolicyName: mlops-min-s3-access PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 's3:ListBucket' - 's3:GetObject' - 's3:PutObject' - 's3:DeleteObject' Resource: - !Join - '' - - 'arn:aws:s3:::' - Fn::ImportValue: !Sub "${pCICDStack}-StagingBucket" - !Join - '' - - 'arn:aws:s3:::' - !Join - '' - - Fn::ImportValue: !Sub "${pCICDStack}-StagingBucket" - "/*" Roles: - !Ref MLPipelineETLRole - !Ref SageMakerExecutionRole SageMakerExecutionRole: Type: 'AWS::IAM::Role' Properties: RoleName: mlops-sm-role AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - sagemaker.amazonaws.com Action: - 'sts:AssumeRole' Path: / ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AmazonSageMakerFullAccess' MLPipelinePolicy: Type: 'AWS::IAM::Policy' Properties: PolicyName: mlops-ml-pipeline-policy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - 'iam:PassRole' Resource: '*' Condition: StringEquals: 'iam:PassedToService': sagemaker.amazonaws.com - Effect: Allow Action: - 'states:*' - 'events:DescribeRule' - 'events:PutRule' - 'events:PutTargets' - 'lambda:InvokeFunction' - 'sagemaker:CreateModel' - 'sagemaker:DeleteEndpointConfig' - 'sagemaker:DescribeTrainingJob' - 'sagemaker:CreateEndpoint' - 'sagemaker:StopTrainingJob' - 'sagemaker:CreateTrainingJob' - 'sagemaker:UpdateEndpoint' - 'sagemaker:CreateEndpointConfig' - 'sagemaker:DeleteEndpoint' - 'glue:StartJobRun' - 'glue:GetJobRun' - 'glue:BatchStopJobRun' - 'glue:GetJobRuns' - 'codepipeline:PutJobFailureResult' - 'codepipeline:PutJobSuccessResult' - 'codepipeline:GetThirdPartyJobDetails' - 'codepipeline:GetJobDetails' - 'codepipeline:GetPipeline' - 'codepipeline:GetPipelineState' - 'codepipeline:GetPipelineExecution' - 'codepipeline:ListActionTypes' Resource: - 'arn:aws:sagemaker:*:*:*' - >- arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTrainingJobsRule - >- arn:aws:events:*:*:rule/StepFunctionsGetEventsForSageMakerTransformJobsRule - !GetAtt MLPipelineControllerFunction.Arn - !Join - '' - - !Sub 'arn:aws:glue:${AWS::Region}:${AWS::AccountId}:job/' - !Ref MLPipelineETLJob Roles: - !Ref MLPipelineRole MLPipelineRole: Type: 'AWS::IAM::Role' Properties: RoleName: mlops-ml-pipeline-role AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - states.amazonaws.com Action: - 'sts:AssumeRole' Path: / MLPipelineETLRole: Type: 'AWS::IAM::Role' Properties: RoleName: mlops-etl-role AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - glue.amazonaws.com Action: - 'sts:AssumeRole' Path: / MLPipeline: DependsOn: - MLPipelinePolicy - SageMakerExecutionRole Type: 'AWS::StepFunctions::StateMachine' Properties: StateMachineName: !Ref pMLPipelineName DefinitionString: !Sub - |- { "StartAt": "Extract, Transform, Load", "States": { "Extract, Transform, Load": { "Parameters": { "JobName.$": "$$.Execution.Input['GlueJobName']", "Arguments.$": "$$.Execution.Input['GlueArgs']" }, "Resource": "arn:aws:states:::glue:startJobRun.sync", "Type": "Task", "Next": "Prepare Experiment" }, "Prepare Experiment": { "Parameters": { "FunctionName.$": "$$.Execution.Input['MLPipelineControllerName']", "Payload": { "ExperimentName.$": "$$.Execution.Input['ExperimentName']", "ExperimentTrialsPrefix.$": "$$.Execution.Input['ExperimentTrialsPrefix']", "TriggerId.$": "$$.Execution.Input['TriggerId']", "OpName": "Track Experiment" } }, "Resource": "arn:aws:states:::lambda:invoke", "ResultPath": "$.TrialName", "Type": "Task", "Next": "Model Training" }, "Model Training": { "Resource": "arn:aws:states:::sagemaker:createTrainingJob.sync", "Parameters": { "AlgorithmSpecification": { "TrainingImage.$": "$$.Execution.Input['TrainingImage']", "TrainingInputMode": "File" }, "OutputDataConfig": { "S3OutputPath.$": "$$.Execution.Input['ModelS3Path']" }, "StoppingCondition": { "MaxRuntimeInSeconds": 86400 }, "ResourceConfig": { "InstanceCount.$": "$$.Execution.Input['TrainingInstanceCount']", "InstanceType.$": "$$.Execution.Input['TrainingInstanceType']", "VolumeSizeInGB": 30 }, "RoleArn": ${SageMakerRole}, "InputDataConfig": [ { "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri.$": "$$.Execution.Input['TrainDataS3Path']", "S3DataDistributionType": "FullyReplicated" } }, "ContentType": "csv", "ChannelName": "train" }, { "DataSource": { "S3DataSource": { "S3DataType": "S3Prefix", "S3Uri.$": "$$.Execution.Input['ValidationDataS3Path']", "S3DataDistributionType": "FullyReplicated" } }, "ContentType": "csv", "ChannelName": "validation" } ], "HyperParameters.$": "$$.Execution.Input['HyperParameters']", "TrainingJobName.$": "$$.Execution.Input['TrainingJobName']", "ExperimentConfig": { "ExperimentName.$": "$$.Execution.Input['ExperimentName']", "TrialName.$": "$.TrialName.Payload", "TrialComponentDisplayName": "Training" } }, "Type": "Task", "ResultPath": "$.TrainingConfigurations", "Next": "Save Model" }, "Save Model": { "ResultPath": "$.ModelStepResults", "Parameters": { "ExecutionRoleArn": ${SageMakerRole}, "ModelName.$": "$$.Execution.Input['ModelName']", "PrimaryContainer": { "Environment": {}, "Image.$": "$$.Execution.Input['TrainingImage']", "ModelDataUrl.$": "$.TrainingConfigurations.ModelArtifacts.S3ModelArtifacts" } }, "Resource": "arn:aws:states:::sagemaker:createModel", "Type": "Task", "Next": "Query Training Results" }, "Query Training Results": { "Parameters": { "FunctionName.$": "$$.Execution.Input['MLPipelineControllerName']", "Payload": { "TrainingJobName.$": "$.TrainingConfigurations.TrainingJobName", "OpName": "Query Training Results" } }, "Resource": "arn:aws:states:::lambda:invoke", "ResultPath": "$.TrainingMetrics", "Type": "Task", "Next": "Accuracy > 90%" }, "Accuracy > 90%": { "Type": "Choice", "Choices": [ { "Variable": "$.TrainingMetrics.Payload[0].Value", "NumericLessThan": 0.1, "Next": "Create Model Endpoint Config" } ], "Default": "Failed to Deliver a New Model" }, "Failed to Deliver a New Model": { "Comment": "Failed to deliver a model endpoint.", "Type": "Fail" }, "Create Model Endpoint Config": { "Resource": "arn:aws:states:::sagemaker:createEndpointConfig", "Parameters": { "EndpointConfigName.$": "$$.Execution.Input['ModelName']", "DataCaptureConfig": { "CaptureOptions": [ { "CaptureMode": "Input" }, { "CaptureMode": "Output" } ], "DestinationS3Uri.$": "$$.Execution.Input['DataCaptureS3Path']", "EnableCapture": true, "InitialSamplingPercentage": 100 }, "ProductionVariants": [ { "InitialInstanceCount.$": "$$.Execution.Input['InfBaseInstanceCount']", "InstanceType.$": "$$.Execution.Input['InfInstanceType']", "ModelName.$": "$$.Execution.Input['ModelName']", "VariantName": "AllTraffic" } ] }, "Type": "Task", "ResultPath": "$.EndpointConfig", "Next": "Check Endpoint" }, "Check Endpoint": { "Parameters": { "FunctionName.$": "$$.Execution.Input['MLPipelineControllerName']", "Payload": { "EndpointName.$": "$$.Execution.Input['InferenceEPName']", "OpName": "Check Endpoint" } }, "Resource": "arn:aws:states:::lambda:invoke", "Type": "Task", "Retry": [ { "ErrorEquals": [ "EndpointExistsInPendingStateException" ], "IntervalSeconds": 30, "MaxAttempts": 10 } ], "Catch": [ { "ErrorEquals": [ "EndpointExistsInBadStateException" ], "Next": "Failed to Deliver a New Model" }, { "ErrorEquals": [ "States.ALL" ], "Next": "Failed to Deliver a New Model" } ], "ResultPath": "$.EpExists", "Next": "Endpoint Exists?" }, "Endpoint Exists?": { "Type": "Choice", "Choices": [ { "Variable": "$.EpExists.Payload", "BooleanEquals": true, "Next": "Update Model Endpoint" } ], "Default": "Create Model Endpoint" }, "Create Model Endpoint": { "Resource": "arn:aws:states:::sagemaker:createEndpoint", "Parameters": { "EndpointConfigName.$": "$$.Execution.Input['ModelName']", "EndpointName.$": "$$.Execution.Input['InferenceEPName']" }, "Type": "Task", "ResultPath": "$.EndpointInfo", "Next": "Validate Endpoint" }, "Update Model Endpoint": { "Resource": "arn:aws:states:::sagemaker:updateEndpoint", "Parameters": { "EndpointConfigName.$": "$$.Execution.Input['ModelName']", "EndpointName.$": "$$.Execution.Input['InferenceEPName']" }, "Type": "Task", "ResultPath": "$.EndpointInfo", "Next": "Validate Endpoint" }, "Validate Endpoint": { "Parameters": { "FunctionName.$": "$$.Execution.Input['MLPipelineControllerName']", "Payload": { "EndpointName.$": "$$.Execution.Input['InferenceEPName']", "OpName": "Check Endpoint" } }, "Resource": "arn:aws:states:::lambda:invoke", "ResultPath": "$.EpExists", "Type": "Task", "Retry": [ { "ErrorEquals": [ "EndpointExistsInPendingStateException" ], "IntervalSeconds": 30, "MaxAttempts": 20 } ], "Catch": [ { "ErrorEquals": [ "EndpointExistsInBadStateException" ], "Next": "Failed to Deliver a New Model" }, { "ErrorEquals": [ "States.ALL" ], "Next": "Failed to Deliver a New Model" } ], "End": true } } } - SageMakerRole: !Join - '' - - '"' - !GetAtt SageMakerExecutionRole.Arn - '"' RoleArn: !GetAtt MLPipelineRole.Arn MLPipelineETLJob: Type: 'AWS::Glue::Job' Properties: Description: >- PySpark job to extract the data and split in to training and validation data sets Command: Name: glueetl ScriptLocation: !Join - '' - - 's3://' - Fn::ImportValue: !Sub "${pCICDStack}-StagingBucket" - /mlops-staging/config/spark-etl.py PythonVersion: '3' ExecutionProperty: MaxConcurrentRuns: 2 MaxRetries: 1 DefaultArguments: '--job-language': python Name: ml-pipeline-etl-job GlueVersion: '1.0' WorkerType: Standard NumberOfWorkers: 2 Timeout: 60 Role: !Ref MLPipelineETLRole SMExperimentsLayer: Type: AWS::Lambda::LayerVersion Properties: CompatibleRuntimes: - python3.7 Content: S3Bucket: Fn::ImportValue: !Sub "${pCICDStack}-StagingBucket" S3Key: mlops-staging/layers/smexperiments.zip Description: Layer for SageMaker Experiments dependencies LayerName: smexperiments LicenseInfo: "Apache License 2.0" MLPipelineControllerRole: Type: 'AWS::IAM::Role' Properties: RoleName: mlops-staging-controller-role AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: / ManagedPolicyArns: - 'arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' - 'arn:aws:iam::aws:policy/AmazonSageMakerReadOnly' Policies: - PolicyName: mlops-staging-experiments-access PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - "sagemaker:CreateTrial" - "sagemaker:CreateExperiment" Resource: "*" MLPipelineControllerFunction: DependsOn: - MLPipelineControllerRole - SMExperimentsLayer Type: 'AWS::Lambda::Function' Properties: FunctionName: ml-pipeline-controller Role: !GetAtt MLPipelineControllerRole.Arn Handler: index.handler Description: >- Facilitates custom control flow logic for the ML pipeline. Timeout: 15 MemorySize: 128 Layers: - !Ref SMExperimentsLayer Runtime: python3.7 Code: ZipFile: | import boto3 import json import time from smexperiments.experiment import Experiment OP_CHECK_EP = "Check Endpoint" OP_TRAIN_RESULTS = "Query Training Results" OP_TRACK_EXPERIMENT = "Track Experiment" sm_client = boto3.client('sagemaker') class EndpointExistsInBadStateException(Exception): pass class EndpointExistsInPendingStateException(Exception): pass def track_experiment(xp_name, trial_prefix, job_id): try: response = sm_client.describe_experiment(ExperimentName=xp_name) except sm_client.exceptions.ResourceNotFound as e : Experiment.create( experiment_name=xp_name, description="Testing SM experiments with SFN", sagemaker_boto_client=sm_client) trial_name = trial_prefix+"-{}".format(job_id) sm_client.create_trial( TrialName=trial_name, ExperimentName=xp_name) return trial_name def check_endpoint(ep_name) : response = None try: response = sm_client.describe_endpoint(EndpointName=ep_name) except Exception as e: ep_exists = False if response : ep_status = response['EndpointStatus'] if ep_status == "InService" : return True elif ep_status == "Updating" or ep_status == "Creating" : raise EndpointExistsInPendingStateException("{} exists in {} state. Wait for InService status.".format(ep_name, ep_status)) else : raise EndpointExistsInBadStateException("{} exists in {} state. Correct this issue.".format(ep_name, ep_status)) return ep_exists def handler(event, context): if ('OpName' in event): op = event['OpName'] else: raise KeyError("OpName key not found in input! The input was: {}.".format(json.dumps(event))) if (op == OP_TRAIN_RESULTS) : if ('TrainingJobName' in event): job_name = event['TrainingJobName'] else: raise KeyError("TrainingJobName key not found in input! The input was: {}.".format(json.dumps(event))) try: response = sm_client.describe_training_job(TrainingJobName=job_name) except Exception as e: response = ('Failed to read training status!') for index, metric in enumerate(response['FinalMetricDataList']): metric['Timestamp'] = metric['Timestamp'].timestamp() return response['FinalMetricDataList'] elif (op == OP_TRACK_EXPERIMENT): if ('ExperimentName' in event): xp_name = event['ExperimentName'] else: raise KeyError("ExperimentName key not found in input! The input was: {}.".format(json.dumps(event))) if ('ExperimentTrialsPrefix' in event): trial_prefix = event['ExperimentTrialsPrefix'] else: raise KeyError("ExperimentName key not found in input! The input was: {}.".format(json.dumps(event))) if ('TriggerId' in event) : job_id = event['TriggerId'] else: raise KeyError('CodePipeline JobID not found in input! The input was: {}.'.format(json.dumps(event))) trial_name = track_experiment(xp_name,trial_prefix, job_id); return trial_name elif (op == OP_CHECK_EP) : if ('EndpointName' in event): ep_name = event['EndpointName'] else: raise KeyError("EndpointName key not found in input! The input was: {}.".format(json.dumps(event))) ep_exists = check_endpoint(ep_name) return ep_exists else : raise Exception("Unrecognized operation: {}".format(op))