AWSTemplateFormatVersion: '2010-09-09' Description: > One-time creation for Amazon Forecast MLOps dependencies Publish: 20221026 Parameters: S3Bucket: Description: Provide the S3 Bucket Name to be used Type: String AllowedPattern: ^[a-z0-9][a-z0-9-_]*[a-z0-9] ExistingS3Bucket: Description: Does your S3 bucket already exist? Default: false Type: String AllowedValues: - true - false ConstraintDescription: must specify true or false Conditions: CreateS3Resource: !Equals - !Ref ExistingS3Bucket - false Resources: ForecastArtifactBucket: Type: AWS::S3::Bucket Condition: CreateS3Resource DeletionPolicy: Retain Properties: BucketName: !Ref S3Bucket ForecastProcessorLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - - Version: '2012-10-17' Path: "/" Policies: - PolicyDocument: Statement: - Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Effect: Allow Resource: 'arn:aws:logs:*:*:*' Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-cloudwatch-ops' - PolicyDocument: Statement: - Action: - ssm:Get* - ssm:PutParameter* Effect: Allow Resource: - !Sub 'arn:aws:ssm:*:${AWS::AccountId}:parameter/forecast/*' Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-ssm-ops' - PolicyDocument: Statement: - Action: - forecast:Describe* - forecast:Get* - forecast:List* - forecast:Query* - forecast:Invoke* - forecast:Tag* - forecast:Untag* - forecast:Create* - forecast:Delete* - forecast:Stop* - forecast:Update* - forecast:Resume* Effect: Allow Resource: '*' Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-forecast-ops' - PolicyDocument: Statement: - Action: - s3:PutObject - s3:DeleteObject - s3:Get* - s3:List* Effect: Allow Resource: - !Join - '' - - 'arn:aws:s3:::' - !Ref S3Bucket - !Join - '' - - 'arn:aws:s3:::' - !Ref S3Bucket - '/*' Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-s3-ops' - PolicyDocument: Statement: - Action: - glue:GetTable - glue:CreateTable - glue:BatchCreatePartition Effect: Allow Resource: - !Sub 'arn:aws:glue:*:${AWS::AccountId}:database/*' - !Sub 'arn:aws:glue:*:${AWS::AccountId}:catalog' - !Sub 'arn:aws:glue:*:${AWS::AccountId}:table/*/*' Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-glue-ops' RoleName: !Sub 'ForecastProcessorLambdaExecutionRole' LambdaGetForecastMetadata: Type: AWS::Lambda::Function Properties: FunctionName: GetForecastMetadata Handler: index.lambda_handler Runtime: python3.9 Role: !GetAtt ForecastProcessorLambdaExecutionRole.Arn Timeout: 5 MemorySize: 128 Code: ZipFile: | from datetime import datetime def lambda_handler(event, context): try: forecastHorizon = int(event.get('ForecastHorizon')) except: forecastHorizon = 0 return { 'timeKey':'%Y%m%d%H%M%S'), 'dateKey':'%Y%m%d'), 'forecastHorizon': forecastHorizon } Description: Simple function that provides variables for unique Predictors, Forecasts and job names. LambdaForecastCreateDatasetGroup: Type: AWS::Lambda::Function Properties: FunctionName: ForecastCreateDatasetGroup Handler: index.lambda_handler Runtime: python3.9 Role: !GetAtt ForecastProcessorLambdaExecutionRole.Arn Timeout: 30 MemorySize: 128 Code: ZipFile: | import os import boto3 def lambda_handler(event, context): StackName = event.get('StackName') ssm = boto3.client('ssm') region= os.environ['AWS_REGION'] aws_account_id = context.invoked_function_arn.split(":")[4] session = boto3.Session(region_name=region) forecast = session.client(service_name='forecast') ParameterPrefix = '/forecast/'+StackName+'/DatasetGroup/' parameter = ssm.get_parameter(Name=ParameterPrefix+'DatasetGroupName') DatasetGroupName = parameter['Parameter']['Value'] parameter = ssm.get_parameter(Name=ParameterPrefix+'DataDomain') DataDomain = parameter['Parameter']['Value'] DatasetArns = [] BaseArnPrefix='arn:aws:forecast:'+region+':'+aws_account_id+':dataset/' try: response = forecast.describe_dataset(DatasetArn=BaseArnPrefix+DatasetGroupName+'_ITEM') item_arn=response['DatasetArn'] DatasetArns.append(item_arn) response = ssm.put_parameter(Name=ParameterPrefix+'DatasetArnItem', Value=item_arn, Type='String', Overwrite=True) except forecast.exceptions.ResourceNotFoundException: item_arn=None try: response = forecast.describe_dataset(DatasetArn=BaseArnPrefix+DatasetGroupName+'_RTS') rts_arn=response['DatasetArn'] DatasetArns.append(rts_arn) response = ssm.put_parameter(Name=ParameterPrefix+'DatasetArnRTS', Value=rts_arn, Type='String', Overwrite=True) except forecast.exceptions.ResourceNotFoundException: rts_arn=None try: response = forecast.describe_dataset(DatasetArn=BaseArnPrefix+DatasetGroupName+'_TTS') tts_arn=response['DatasetArn'] DatasetArns.append(tts_arn) response = ssm.put_parameter(Name=ParameterPrefix+'DatasetArnTTS', Value=tts_arn, Type='String', Overwrite=True) except forecast.exceptions.ResourceNotFoundException: tts_arn=None try: response = forecast.create_dataset_group( DatasetGroupName=DatasetGroupName, Domain=DataDomain, DatasetArns=DatasetArns, Tags=[ { 'Key': 'Createdby', 'Value': 'MLOps' }, ] ) DatasetGroupArn = response['DatasetGroupArn'] response = ssm.put_parameter(Name=ParameterPrefix+'DatasetGroupArn', Value=DatasetGroupArn, Type='String', Overwrite=True) return { 'DatasetGroupArn': DatasetGroupArn } except forecast.exceptions.ResourceAlreadyExistsException: pass return { 'DatasetGroupArn': None } except: raise return { 'DatasetGroupArn': None } Description: Simple function that creates a variable dataset group based on dataset member existence LambdaForecastSelectPredictor: Type: AWS::Lambda::Function Properties: FunctionName: ForecastSelectPredictor Handler: index.lambda_handler Runtime: python3.9 Role: !GetAtt ForecastProcessorLambdaExecutionRole.Arn Timeout: 30 MemorySize: 128 Code: ZipFile: | #THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, #INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A #PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT #HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION #OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE #SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import os import boto3 from datetime import datetime, timezone def lambda_handler(event, context): try: StackName = event.get('StackName') ssm = boto3.client('ssm') region= os.environ['AWS_REGION'] session = boto3.Session(region_name=region) forecast = session.client(service_name='forecast') ParameterPrefix = '/forecast/'+StackName try: parameter = ssm.get_parameter(Name=ParameterPrefix+'/Forecast/PredictorCutoffAge') PredictorCutoffAge = int(parameter['Parameter']['Value']) except: PredictorCutoffAge = 2 response = ssm.put_parameter(Name=ParameterPrefix+'/Forecast/PredictorCutoffAge', Value=str(PredictorCutoffAge), Type='String', Overwrite=True) try: parameter = ssm.get_parameter(Name=ParameterPrefix+'/Forecast/ForecastOptimizationMetric') ForecastOptimizationMetric = parameter['Parameter']['Value'] except: parameter = ssm.get_parameter(Name=ParameterPrefix+'/Predictor/ForecastOptimizationMetric') ForecastOptimizationMetric = parameter['Parameter']['Value'] response = ssm.put_parameter(Name=ParameterPrefix+'/Forecast/ForecastOptimizationMetric', Value=ForecastOptimizationMetric, Type='String', Overwrite=True) print('Evaluating Predictors under %s days aged, using lowest error metric of %s' % ( str(PredictorCutoffAge), ForecastOptimizationMetric)) parameter = ssm.get_parameter(Name=ParameterPrefix+'/DatasetGroup/DatasetGroupArn') DatasetGroupArn = parameter['Parameter']['Value'] response = forecast.list_predictors( Filters=[ { 'Key': 'DatasetGroupArn', 'Value': DatasetGroupArn, 'Condition': 'IS' }, ] ) lowest_metric = 2**32 lowest_arn = '' for i in response['Predictors']: if i['Status'] == 'ACTIVE': eligible = True # first evalate tags in case you want to exclude items because of tag value response = forecast.list_tags_for_resource(ResourceArn=i['PredictorArn']) for t in response['Tags']: # setup any other tags desired, this is an example if t['Key']=='ELIGIBLE' and t['Value']=='FALSE': eligible = False else: eligible = False # second, evaluate any criteria about the predictor to disqualify it if eligible: # consider other options avaialble with the payload response documented here # predictor = forecast.describe_auto_predictor(PredictorArn=i['PredictorArn']) delta = - predictor['CreationTime'] if delta.days > PredictorCutoffAge: print('Disqualifying ARN %s due to age of %s' % (i['PredictorArn'], delta.days)) eligible = False # if not disqualified by tag or predictor attributes, consider this predictor for CreateForecast operation if eligible: # consider other options avaialble with the payload response documented here # predictor = forecast.get_accuracy_metrics(PredictorArn=i['PredictorArn']) for p in predictor['PredictorEvaluationResults']: for tw in p['TestWindows']: if ForecastOptimizationMetric != 'AverageWeightedQuantileLoss': for em in tw['Metrics']['ErrorMetrics']: error = float(em[ForecastOptimizationMetric]) print('Evaluating ARN %s with error %s' % ( i['PredictorArn'], error)) if error > 0 and error < lowest_metric: lowest_metric = error lowest_arn = i['PredictorArn'] else: AWQL = float(tw['Metrics']['AverageWeightedQuantileLoss']) print('Evaluating ARN %s with AWQL %s' % ( i['PredictorArn'], AWQL)) if AWQL > 0 and AWQL < lowest_metric: lowest_metric = AWQL lowest_arn = i['PredictorArn'] # determine existing CreateAutoPredictor ARN parameter = ssm.get_parameter(Name=ParameterPrefix+'/Forecast/PredictorArn') ExistingPredictorArn = parameter['Parameter']['Value'] # if a new ARN was discovered if lowest_metric > 0 and len(lowest_arn)>0 and ExistingPredictorArn != str(lowest_arn): print('Setting Predictor ARN for loss winner=',lowest_arn,lowest_metric ) # Set the ARN of winning Predictor response = ssm.put_parameter(Name=ParameterPrefix+'/Forecast/PredictorArn', Value=lowest_arn, Type='String', Overwrite=True) # Turn on forecasting in case disabled response = ssm.put_parameter(Name=ParameterPrefix+'/Forecast/Generate', Value='TRUE', Type='String', Overwrite=True) # Set reference ARN and move to RETRAIN strategy response = ssm.put_parameter(Name=ParameterPrefix+'/Predictor/ReferenceArn', Value=lowest_arn, Type='String', Overwrite=True) # Turn on forecasting in case disabled response = ssm.put_parameter(Name=ParameterPrefix+'/Predictor/Strategy', Value='RETRAIN', Type='String', Overwrite=True) else: print('No new or additional predictors were found for CreateForecast operation') except: raise Description: Function to choose a predictor based on age and accuracy conditions LambdaForecastPurgeS3Folder: Type: AWS::Lambda::Function Properties: FunctionName: ForecastPurgeS3Folder Handler: index.lambda_handler Runtime: python3.9 Role: !GetAtt ForecastProcessorLambdaExecutionRole.Arn Timeout: 120 MemorySize: 128 Code: ZipFile: | #THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, #INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A #PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT #HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION #OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE #SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import os import json import boto3 def lambda_handler(event, context): #clear S3 location to prevent duplicate inputs s3_client = boto3.client('s3') BUCKET = event.get('BucketName') PREFIX = event.get('Prefix') response = s3_client.list_objects_v2(Bucket=BUCKET, Prefix=PREFIX) if int(response['KeyCount'])>0: for object in response['Contents']: s3_client.delete_object(Bucket=BUCKET, Key=object['Key']) return event Description: Function that purges a named S3 bucket and prefix to then allow it to be populated with fresh TTS,RTS,IM for Amazon Forecast to import LambdaForecastRemoveAthenaQuotes: Type: AWS::Lambda::Function Properties: FunctionName: ForecastRemoveAthenaQuotes Handler: index.lambda_handler Runtime: python3.9 Role: !GetAtt ForecastProcessorLambdaExecutionRole.Arn Timeout: 300 MemorySize: 3008 Code: ZipFile: | #THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, #INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A #PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT #HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION #OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE #SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. import os import json import boto3 import re def lambda_handler(event, context): #clear S3 location to prevent duplicate inputs s3_client = boto3.client('s3') BUCKET = event.get('BucketName') PREFIX = event.get('Prefix') response = s3_client.list_objects_v2(Bucket=BUCKET, Prefix=PREFIX) if int(response['KeyCount'])>0: for object in response['Contents']: # delete metadata files if object['Key'].endswith('metadata'): print('Deleted ',object['Key']) s3_client.delete_object(Bucket=BUCKET, Key=object['Key']) if object['Key'].endswith('csv'): response = s3_client.get_object(Bucket=BUCKET,Key=object['Key']) content = response['Body'].read() b = bytearray(content) str1 = b.decode() # replace double quotes str1= str1.replace('"','') # overwrite edited file to S3 s3_client.put_object( Bucket=BUCKET, Key=object['Key'], Body=str1 ) return event Description: Function to remove double quotes created by Athena Query during step function execution. StateMachineExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Statement: - Action: - sts:AssumeRole Effect: Allow Principal: Service: - !Sub 'states.${AWS::Region}' Version: '2012-10-17' Path: "/" Policies: - PolicyDocument: Statement: - Action: - lambda:InvokeFunction Effect: Allow Resource: - !Join - '' - - !GetAtt LambdaGetForecastMetadata.Arn - ':*' - !Join - '' - - !GetAtt LambdaForecastCreateDatasetGroup.Arn - ':*' - !Join - '' - - !GetAtt LambdaForecastSelectPredictor.Arn - ':*' - !Join - '' - - !GetAtt LambdaForecastPurgeS3Folder.Arn - ':*' - !Join - '' - - !GetAtt LambdaForecastRemoveAthenaQuotes.Arn - ':*' Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-lambda-ops' - PolicyDocument: Statement: - Action: - athena:StartQueryExecution - athena:GetQueryExecution - athena:GetQueryResults - athena:GetDataCatalog - glue:GetTable - glue:GetPartitions - glue:GetDatabase Effect: Allow Resource: '*' Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-athena-ops' - PolicyDocument: Statement: - Action: - s3:PutObject - s3:DeleteObject - s3:Get* - s3:List* Effect: Allow Resource: - !Join - '' - - 'arn:aws:s3:::' - !Ref S3Bucket - !Join - '' - - 'arn:aws:s3:::' - !Ref S3Bucket - '/*' Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-s3-ops' - PolicyDocument: Statement: - Action: - forecast:Describe* - forecast:Get* - forecast:List* - forecast:Query* - forecast:Invoke* - forecast:Tag* - forecast:Untag* - forecast:Create* - forecast:Delete* - forecast:Stop* - forecast:Update* - forecast:Resume* Effect: Allow Resource: '*' Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-forecast-ops' - PolicyDocument: Statement: - Action: - iam:PassRole Effect: Allow Resource: !GetAtt ForecastProcessorLambdaExecutionRole.Arn Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-iam-passrole-ops' - PolicyDocument: Statement: - Action: - sns:Publish Effect: Allow Resource: - !Sub 'arn:aws:sns:*:${AWS::AccountId}:*' Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-sns-ops' - PolicyDocument: Statement: - Action: - ssm:GetParameter - ssm:GetParameters - ssm:PutParameter Effect: Allow Resource: - !Sub 'arn:aws:ssm:*:${AWS::AccountId}:parameter/forecast/*' Version: '2012-10-17' PolicyName: !Sub '${AWS::StackName}-ssm-ops' RoleName: !Sub 'ForecastStepFunctionExecutionRole'