AWSTemplateFormatVersion: "2010-09-09" Transform: - AWS::Serverless-2016-10-31 Description: > The workflow creates the resources to run an offline reinforcement learning workflow which can optimize a process using historic data. Parameters: CreateDummyDataPipeline: Description: "Create a lambda function and kinesis data firehouse to produce sample training data" Type: String Default: "True" States: Description: "The measurement value to use as the state of the sytem, seperated by commas" Type: String Default: "cart_position, cart_velocity, pole_angle, pole_angular_velocity, goal_position" Actions: Description: "The measurement value to use as the actions on the sytem, seperated by commas" Type: String Default: "external_force" Reward: Description: "A function of measurement values to use as the reward" Type: String Default: "reward" Conditions: CreateDummyDataPipelineCondition: !Equals - !Ref CreateDummyDataPipeline - "True" Resources: ################################# ########## Nested Stacks ######## ################################# MeasurementsFirehose: Type: AWS::Serverless::Application Condition: CreateDummyDataPipelineCondition Properties: Location: nested_stacks/firehose_to_s3/template.yaml Parameters: OutputBucketArn: !GetAtt DataBucket.Arn MetadataExtractionQueryString: > { device_id: .device_id, episode_id: .episode_id, year: .epoch_time| strftime("%Y"), month: .epoch_time| strftime("%m"), day: .epoch_time| strftime("%d"), hour: .epoch_time| strftime("%H") } FirehoseOutputS3Prefix: !Join - "" - - type=measurement/ - device=!{partitionKeyFromQuery:device_id}/ - year=!{partitionKeyFromQuery:year}/ - month=!{partitionKeyFromQuery:month}/ - day=!{partitionKeyFromQuery:day}/ - hour=!{partitionKeyFromQuery:hour}/ ################################# ########## Security ############# ################################# SagemakerDeployRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: Action: sts:AssumeRole Policies: - PolicyName: ServeModel PolicyDocument: Version: '2012-10-17' Statement: - Sid: AllowPullImage Effect: Allow Action: - ecr:BatchGetImage - ecr:GetDownloadUrlForLayer - ecr:BatchCheckLayerAvailability Resource: "*" SagemakerTrainRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: Action: sts:AssumeRole GlueRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: sts:AssumeRole Principal: Service: - Policies: - PolicyName: AthenaQueryPolicy PolicyDocument: Version: '2012-10-17' Statement: - Sid: AllowAthenatQuery Effect: Allow Action: - athena:StartQueryExecution - athena:GetQueryResults - athena:GetWorkGroup - athena:StopQueryExecution - athena:GetQueryExecution - glue:CreateTable - glue:GetTable - glue:GetPartitions - glue:DeleteTable - s3:ListAllMyBuckets Resource: "*" CloudWatchLogsAndS3Policy: Type: AWS::IAM::Policy Properties: Roles: - !Ref SagemakerTrainRole - !Ref SagemakerDeployRole - !Ref GlueRole PolicyName: CloudWatchLogsAndS3 PolicyDocument: Version: '2012-10-17' Statement: - Sid: AllowS3Crud Effect: Allow Action: - s3:GetBucketLocation - s3:GetBucketLocation - s3:GetBucketAcl - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:ListMultipartUploadParts - s3:AbortMultipartUpload - s3:PutObject Resource: - !GetAtt AssetsBucket.Arn - !Sub "${AssetsBucket.Arn}/*" - !GetAtt DataBucket.Arn - !Sub "${DataBucket.Arn}/*" - Sid: WriteCWLogsAndMetrics Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:DescribeLogGroups - logs:DescribeLogStreams - logs:PutRetentionPolicy - logs:PutLogEvents - logs:GetLogEvents - cloudwatch:PutMetricData Resource: "*" ################################# ########## Storage ############## ################################# DataBucket: Type: AWS::S3::Bucket AssetsBucket: Type: AWS::S3::Bucket GlueDb: Type: AWS::Glue::Database # Condition: CreateDummyDataPipelineCondition Properties: DatabaseInput: Name: !Sub "${AWS::StackName}_glue_db" CatalogId: !Ref AWS::AccountId GlueMeasurementsTable: Type: AWS::Glue::Table # Condition: CreateDummyDataPipelineCondition Properties: DatabaseName: !Ref GlueDb CatalogId: !Ref AWS::AccountId TableInput: Name: measurements_table TableType: EXTERNAL_TABLE Parameters: EXTERNAL: True StorageDescriptor: Columns: - Name: cart_position Type: double - Name: cart_velocity Type: double - Name: pole_angle Type: double - Name: pole_angular_velocity Type: double - Name: goal_position Type: double - Name: external_force Type: double - Name: reward Type: double - Name: done Type: boolean - Name: info Type: string - Name: episode_id Type: string - Name: device_id Type: string - Name: epoch_time Type: double - Name: time_step Type: int - Name: action_source Type: string Location: !Sub "s3://${DataBucket}/type=measurement/" InputFormat: org.apache.hadoop.mapred.TextInputFormat OutputFormat: SerdeInfo: SerializationLibrary: Parameters: paths: !Sub > ${States}, ${Actions}, reward, done,info,episode_id,device_id,epoch_time, time_step, action_source ###################################### ###### Compute Resources ############# ###################################### AthenaWorkGroup: Type: AWS::Athena::WorkGroup Properties: Name: !Sub "${AWS::StackName}-workgroup" WorkGroupConfiguration: ResultConfiguration: OutputLocation: !Sub "s3://${DataBucket}/athena_results/" GenerateDataStateMachine: Type: AWS::Serverless::StateMachine # More info about State Machine Resource: Condition: CreateDummyDataPipelineCondition Properties: DefinitionUri: state_machine/data_generation_state_machine.yaml DefinitionSubstitutions: RunSimulationFunctionArn: !GetAtt RunPhysicsSimulationFunction.Arn Policies: # Find out more about SAM policy templates: - LambdaInvokePolicy: FunctionName: !Ref RunPhysicsSimulationFunction SimulationLayer: Type: AWS::Serverless::LayerVersion Condition: CreateDummyDataPipelineCondition Properties: LayerName: SimulationLayer Description: This layer has a custom gym environment to run continuous cartpole ContentUri: functions/simulation_layer RetentionPolicy: Delete Metadata: BuildMethod: python3.9 RunPhysicsSimulationFunction: Type: AWS::Serverless::Function Condition: CreateDummyDataPipelineCondition Properties: CodeUri: functions/run_physics_simulation/ Handler: run_physics_simulation_app.lambda_handler Runtime: python3.9 Layers: - !Ref SimulationLayer Environment: Variables: DELIVERY_STREAM_NAME: !GetAtt MeasurementsFirehose.Outputs.DeliveryStreamName Policies: - Statement: - Sid: AllowFirehoseWrite Effect: Allow Action: - firehose:PutRecord - firehose:PutRecordBatch Resource: !GetAtt MeasurementsFirehose.Outputs.DeliveryStreamArn - Sid: AllowSagemakerInvokeEndpoint Effect: Allow Action: sagemaker:InvokeEndpoint Resource: "*" Timeout: 120 # HyperparameterTuningOfflineTrainingJobLauncherFunction: TuningJobLauncherFunction: Type: AWS::Serverless::Function # More info about Function Resource: Properties: CodeUri: functions/offline_training_job_launcher/ Handler: hyperparameter_tuning_launcher_app.lambda_handler Runtime: python3.9 Environment: Variables: TRAIN_INSTANCE_TYPE: "ml.m5.2xlarge" SAGEMAKER_TRAINING_BUCKET: !Ref AssetsBucket SAGEMAKER_TRAINING_ROLE: !Ref SagemakerTrainRole DATA_LOCATION: !Sub "s3://${DataBucket}/json_offline_data/" MAX_TUNING_JOBS: 4 TRAINING_ITERATIONS: 1000 STATES: !Ref States ACTIONS: !Ref Actions # TRAINING_IMAGE_URI: !Sub "${AWS::AccountId}.dkr.ecr.${AWS::Region}${TrainingImageName}:latest" Policies: # Find out more about SAM policy templates: - Statement: - Sid: SageMakerTuningJob Effect: Allow Action: - sagemaker:CreateHyperParameterTuningJob - sagemaker:AddTags Resource: "*" - S3CrudPolicy: BucketName: !Ref AssetsBucket - S3ReadPolicy: BucketName: !Ref DataBucket - Statement: - Sid: GetPassSagemakerTrainingRole Effect: Allow Action: - iam:GetRole - iam:PassRole Resource: !GetAtt SagemakerTrainRole.Arn Timeout: 30 MemorySize: 256 ModelDeployerFunction: Type: AWS::Serverless::Function # More info about Function Resource: Properties: CodeUri: functions/model_deployer/ Handler: model_deployer_app.lambda_handler Runtime: python3.9 Environment: Variables: SM_MODEL_DEPLOY_ROLE : !Ref SagemakerDeployRole MODEL_BUCKET: !Ref AssetsBucket Policies: # Find out more about SAM policy templates: - S3CrudPolicy: BucketName: !Ref AssetsBucket - Statement: - Sid: SagemakerCreateDeployModel Effect: Allow Action: - sagemaker:CreateEndpoint - sagemaker:CreateEndpointConfig - sagemaker:CreateModel - sagemaker:AddTags Resource: "*" - Statement: - Sid: GetPassSagemakerDeployRole Effect: Allow Action: - iam:GetRole - iam:PassRole Resource: !GetAtt SagemakerDeployRole.Arn # Resource: '*' Timeout: 120 MemorySize: 256 #################################################### ################ Custom Resources ################## #################################################### # UploadS3CustomResource: # Type: Custom::S3Uploader # Properties: # ServiceToken: !GetAtt S3UploadHandler.Arn # BucketName: !Ref AssetsBucket # S3UploadHandler: # Type: AWS::Serverless::Function # Properties: # CodeUri: functions/upload_to_s3/ # Handler: app.lambda_handler # Runtime: python3.9 # Timeout: 180 # MemorySize: 2048 # Policies: # - S3CrudPolicy: # BucketName: !Ref AssetsBucket # # - Statement: # # - Sid: AllowListBuckets # This is required for s3fs # # Effect: Allow # # Action: # # - s3:ListAllMyBuckets # # Resource: '*' # Environment: # Variables: # BUCKET: !Ref AssetsBucket Outputs: AthenaQueryToCreateJsonFormatedData: Description: This query will allow you to transform the measurements table to a set of json objects which can be injested by the machine learning algorithm. Value: !Sub | UNLOAD ( SELECT * FROM ( SELECT 'SampleBatch' AS type, ARRAY[episode_id] as episode_id, ARRAY[row_number() OVER ()] as unroll_id, ARRAY[ARRAY[${States}]] as obs, ARRAY_AGG(ARRAY[${States}]) OVER ( PARTITION BY episode_id ORDER BY episode_id, epoch_time ROWS BETWEEN 1 FOLLOWING AND 1 FOLLOWING ) as new_obs, ARRAY[ARRAY[${Actions}]] as actions, ARRAY[${Reward}] as rewards, ARRAY[false] as dones FROM ( SELECT ${States}, ${Actions}, ${Reward}, epoch_time, episode_id, action_source FROM "${GlueDb}"."${GlueMeasurementsTable}" ) WHERE action_source = 'LQR_epsilon=0.5' ORDER BY episode_id, epoch_time ASC ) first_query WHERE first_query.new_obs IS NOT NULL ) TO 's3://${DataBucket}/json_offline_data/' WITH (format = 'JSON', compression = 'NONE')