AWSTemplateFormatVersion: '2010-09-09' Transform: 'AWS::Serverless-2016-10-31' Description: Creates the Staging Engine component of the Data Lake. Resources: # SNS Topics FileProcessingFailureSNS: Type: AWS::SNS::Topic Properties: DisplayName: SNS topic that file processing failure notifications are sent to. TopicName: !Sub "${EnvironmentPrefix}${FileProcessingFailureTopicName}" # Roles - these need breaking down into finegrained roles for each lambda. LambdaExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - lambda.amazonaws.com Action: "sts:AssumeRole" Path: "/" Policies: - PolicyName: S3GetPutTag PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:PutObject - s3:GetObject - s3:GetObjectVersionTagging - s3:GetObjectTagging - s3:PutObjectTagging - s3:PutObjectAcl Resource: "*" - PolicyName: KMSBasic PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - kms:Decrypt - kms:Encrypt - kms:PutKeyPolicy - kms:GenerateDataKey Resource: "*" - PolicyName: Logs PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:* Resource: "*" - PolicyName: DDBGetPutScan PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - dynamodb:BatchWriteItem - dynamodb:GetItem - dynamodb:PutItem - dynamodb:GetShardIterator - dynamodb:Scan - dynamodb:Query - dynamodb:UpdateItem - dynamodb:DescribeStream - dynamodb:UpdateTable - dynamodb:GetRecords Resource: "*" LambdaFailedFileProcessorRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - lambda.amazonaws.com Action: "sts:AssumeRole" Path: "/" Policies: - PolicyName: S3AccessForFailedFileProcessorRole PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - s3:PutObject - s3:PutObjectTagging Resource: !Join - '' - - Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-S3Failed-Arn" - /* - Effect: Allow Action: - s3:GetObject - s3:GetObjectVersionTagging - s3:GetObjectTagging - s3:DeleteObject Resource: !Join - '' - - Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-S3Raw-Arn" - /* - Effect: Allow Action: - s3:ListBucket Resource: - Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-S3Raw-Arn" - Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-S3Failed-Arn" - PolicyName: KMSBasic PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - kms:Decrypt - kms:Encrypt - kms:PutKeyPolicy - kms:GenerateDataKey Resource: "*" - PolicyName: Logs PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - logs:* Resource: "*" # SAM Serverless (Lambda) functions StartFileProcessing: Type: 'AWS::Serverless::Function' Properties: Handler: startFileProcessing.lambda_handler Runtime: python3.6 CodeUri: ./src/startFileProcessing.py Description: Initiates File Processing Step Function. This is triggered when new file put into RAW bucket. MemorySize: 128 Timeout: 300 Policies: - arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess - DynamoDBCrudPolicy: TableName: Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-S3FileProcessingCacheTableName" - DynamoDBCrudPolicy: TableName: Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-DataCatalogTableName" - SNSPublishMessagePolicy: TopicName: !Sub "${EnvironmentPrefix}${FileProcessingFailureTopicName}" Environment: Variables: DATA_CATALOG_TABLE_NAME: Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-DataCatalogTableName" DATA_SOURCE_TABLE_NAME: Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-DataSourceTableName" S3_CACHE_TABLE_NAME: Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-S3FileProcessingCacheTableName" SNS_FAILURE_ARN: !Ref FileProcessingFailureSNS STEP_FUNCTION: !Ref FileProcessor STAGING_BUCKET_NAME: Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-S3Staging-Name" RAW_BUCKET_NAME: Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-S3Raw-Name" FAILED_BUCKET_NAME: Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-S3Failed-Name" DependsOn: FileProcessor GetFileType: Type: 'AWS::Serverless::Function' Properties: Handler: getFileType.lambda_handler Runtime: python3.6 CodeUri: ./src/getFileType.py Description: Retrieves the matching file type (data source) for the new file. MemorySize: 128 Timeout: 10 Policies: - DynamoDBReadPolicy: TableName: Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-DataSourceTableName" GetFileSettings: Type: 'AWS::Serverless::Function' Properties: Handler: getFileSettings.lambda_handler Runtime: python3.6 CodeUri: ./src/getFileSettings.py Description: Load the settings for the new file's file type (data source) MemorySize: 128 Timeout: 300 Role: !GetAtt [ LambdaExecutionRole, Arn ] VerifyFileSchema: Type: 'AWS::Serverless::Function' Properties: Handler: verifyFileSchema.lambda_handler Runtime: python3.6 CodeUri: ./src/verifyFileSchema Description: Verify the schema of the file (if configured). MemorySize: 384 Timeout: 600 Role: !GetAtt [ LambdaExecutionRole, Arn ] CalculateMetaDataForFile: Type: 'AWS::Serverless::Function' Properties: Handler: calculateMetaDataForFile.lambda_handler Runtime: python3.6 CodeUri: ./src/calculateMetaDataForFile.py Description: Attach the required tags and metadata to the new file. MemorySize: 128 Timeout: 600 Role: !GetAtt [ LambdaExecutionRole, Arn ] CopyFileFromRawToStaging: Type: 'AWS::Serverless::Function' Properties: Handler: copyFileFromRawToStaging.lambda_handler Runtime: python3.6 CodeUri: ./src/copyFileFromRawToStaging.py Description: Copy the new file, and its tags and metadata to the staging bucket. MemorySize: 128 Timeout: 600 Policies: Role: !GetAtt [ LambdaExecutionRole, Arn ] DeleteRawFile: Type: 'AWS::Serverless::Function' Properties: Handler: deleteRawFile.lambda_handler Runtime: python3.6 CodeUri: ./src/deleteRawFile.py Description: Deletes the raw file after successful or failed staging. MemorySize: 128 Timeout: 600 Role: !GetAtt [ LambdaFailedFileProcessorRole, Arn ] RecordSuccessfulStaging: Type: 'AWS::Serverless::Function' Properties: Handler: recordSuccessfulStaging.lambda_handler Runtime: python3.6 CodeUri: ./src/recordSuccessfulStaging.py Description: Records successful staging in the data lake data catalog, and sends success SNS if configured. MemorySize: 128 Timeout: 300 Policies: - DynamoDBCrudPolicy: TableName: Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-DataCatalogTableName" - SNSPublishMessagePolicy: TopicName: '*' CopyFileFromRawToFailed: Type: 'AWS::Serverless::Function' Properties: Handler: copyFileFromRawToFailed.lambda_handler Runtime: python3.6 CodeUri: ./src/copyFileFromRawToFailed.py Description: Copy files that have failed ingress from the raw to failed bucket. MemorySize: 128 Timeout: 600 Role: !GetAtt [ LambdaFailedFileProcessorRole, Arn ] RecordFailedStaging: Type: 'AWS::Serverless::Function' Properties: Handler: recordFailedStaging.lambda_handler Runtime: python3.6 CodeUri: ./src/recordFailedStaging.py Description: Records failed staging in the data lake data catalog, and sends failure SNS if configured. MemorySize: 128 Timeout: 300 Policies: - DynamoDBCrudPolicy: TableName: Fn::ImportValue: !Sub "${EnvironmentPrefix}DataLake-DataCatalogTableName" - SNSPublishMessagePolicy: TopicName: '*' StatesExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - !Sub states.${AWS::Region}.amazonaws.com Action: "sts:AssumeRole" Path: "/" Policies: - PolicyName: StatesExecutionPolicy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "lambda:InvokeFunction" Resource: "*" # Step Function state machine # Remove WaitForRawBucketReadsToComplete wait state if no clients are reading # data directly from the raw bucket after receiving an S3 PUT trigger. # NOTE: reading from the Raw bucket is not advisable - if you need low-latency / # near real-time access to new data, consider implementing the lambda architecture # with kinesis data streams and kinesis firehose. If you have to read from the raw # bucket for certain data sources, consider making this wait state configurable. FileProcessor: Type: AWS::StepFunctions::StateMachine Properties: StateMachineName: !Sub "${EnvironmentPrefix}stagingengine" DefinitionString: !Sub - |- { "Comment": "State machine to manage the staging of new files added to the data lake", "StartAt": "GetFileType", "States": { "GetFileType": { "Type": "Task", "Resource": "${GetFileTypeArn}", "Comment": "Retrieves the matching file type (data source) for the new file.", "Next": "GetFileSettings", "Catch": [ { "ErrorEquals": ["GetFileTypeException","Exception"], "ResultPath": "$.error-info", "Next": "CopyFileFromRawToFailed" } ], "Retry" : [ { "ErrorEquals": [ "Lambda.Unknown", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 }, { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 } ] }, "GetFileSettings": { "Type": "Task", "Resource": "${GetFileSettingsArn}", "Comment": "Load the settings for the new file's file type (data source)", "Next": "VerifyFileSchema", "Catch": [ { "ErrorEquals": ["GetFileSettingsException","Exception"], "ResultPath": "$.error-info", "Next": "CopyFileFromRawToFailed" } ], "Retry" : [ { "ErrorEquals": [ "Lambda.Unknown", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 }, { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 } ] }, "VerifyFileSchema": { "Type": "Task", "Resource": "${VerifyFileSchemaArn}", "Comment": "Verify the schema of the file (if configured).", "Next": "CalculateMetaDataForFile", "Catch": [ { "ErrorEquals": ["VerifyFileSchemaException","Exception"], "ResultPath": "$.error-info", "Next": "CopyFileFromRawToFailed" } ], "Retry" : [ { "ErrorEquals": [ "Lambda.Unknown", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 }, { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 } ] }, "CalculateMetaDataForFile": { "Type": "Task", "Resource": "${CalculateMetaDataForFileArn}", "Comment": "Attach the required tags and metadata to the new file. ", "Next": "CopyFileFromRawToStaging", "Catch": [ { "ErrorEquals": ["CalculateMetaDataForFileException","Exception"], "ResultPath": "$.error-info", "Next": "CopyFileFromRawToFailed" } ], "Retry" : [ { "ErrorEquals": [ "Lambda.Unknown", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 }, { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 } ] }, "CopyFileFromRawToStaging": { "Type": "Task", "Resource": "${CopyFileFromRawToStagingArn}", "Comment": "Copy the new file, and its tags and metadata to the staging bucket.", "Next": "WaitForRawBucketReadsToComplete", "Catch": [ { "ErrorEquals": ["CopyFileFromRawToStagingException","Exception"], "ResultPath": "$.error-info", "Next": "CopyFileFromRawToFailed" } ], "Retry" : [ { "ErrorEquals": [ "Lambda.Unknown", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 }, { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 } ] }, "WaitForRawBucketReadsToComplete": { "Type": "Wait", "Seconds": 10, "Next": "DeleteRawFileAfterSuccessfulStaging" }, "DeleteRawFileAfterSuccessfulStaging": { "Type": "Task", "Resource": "${DeleteRawFileArn}", "Comment": "Deletes the file from the raw bucket.", "Next": "RecordSuccessfulStaging", "Catch": [ { "ErrorEquals": ["DeleteRawFileException","Exception"], "ResultPath": "$.error-info", "Next": "CopyFileFromRawToFailed" } ], "Retry" : [ { "ErrorEquals": [ "Lambda.Unknown", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 }, { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 } ] }, "RecordSuccessfulStaging": { "Type": "Task", "Resource": "${RecordSuccessfulStagingArn}", "Comment": "Record successful staging in Data Catalog", "Next": "FinishedProcessingSuccessfulFile", "Catch": [ { "ErrorEquals": ["RecordSuccessfulStagingException","Exception"], "ResultPath": "$.error-info", "Next": "CopyFileFromRawToFailed" } ], "Retry" : [ { "ErrorEquals": [ "Lambda.Unknown", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 }, { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 } ] }, "CopyFileFromRawToFailed": { "Type": "Task", "Resource": "${CopyFileFromRawToFailedArn}", "Comment": "Copy files that have failed ingress from the raw to failed bucket.", "Next": "DeleteRawFileAfterFailedStaging", "Catch": [ { "ErrorEquals": ["CopyFileFromRawToFailedException","Exception"], "ResultPath": "$.error-info", "Next": "RecordFailedStaging" } ], "Retry" : [ { "ErrorEquals": [ "Lambda.Unknown", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 }, { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 } ] }, "DeleteRawFileAfterFailedStaging": { "Type": "Task", "Resource": "${DeleteRawFileArn}", "Comment": "Deletes the file from the raw bucket.", "Next": "RecordFailedStaging", "Catch": [ { "ErrorEquals": ["DeleteRawFileException","Exception"], "ResultPath": "$.error-info", "Next": "RecordFailedStaging" } ], "Retry" : [ { "ErrorEquals": [ "Lambda.Unknown", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 }, { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 } ] }, "RecordFailedStaging": { "Type": "Task", "Resource": "${RecordFailedStagingArn}", "Comment": "Record failed staging in Data Catalog", "Next": "FinishedProcessingUnsuccessfulFile", "Catch": [ { "ErrorEquals": ["RecordFailedStagingException","Exception"], "ResultPath": "$.error-info", "Next": "FinishedProcessingUnsuccessfulFile" } ], "Retry" : [ { "ErrorEquals": [ "Lambda.Unknown", "Lambda.ServiceException", "Lambda.AWSLambdaException", "Lambda.SdkClientException" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 }, { "ErrorEquals": [ "States.ALL" ], "IntervalSeconds": 2, "MaxAttempts": 4, "BackoffRate": 1.5 } ] }, "FinishedProcessingUnsuccessfulFile": { "Type": "Pass", "Result": "Fail", "End": true }, "FinishedProcessingSuccessfulFile": { "Type": "Pass", "Result": "Success", "End": true } } } - GetFileTypeArn: !GetAtt [GetFileType, Arn] GetFileSettingsArn: !GetAtt [GetFileSettings, Arn] VerifyFileSchemaArn: !GetAtt [VerifyFileSchema, Arn] CalculateMetaDataForFileArn: !GetAtt [CalculateMetaDataForFile, Arn] RecordSuccessfulStagingArn: !GetAtt [RecordSuccessfulStaging, Arn] CopyFileFromRawToStagingArn: !GetAtt [CopyFileFromRawToStaging, Arn] CopyFileFromRawToFailedArn: !GetAtt [CopyFileFromRawToFailed, Arn] DeleteRawFileArn: !GetAtt [DeleteRawFile, Arn] RecordFailedStagingArn: !GetAtt [RecordFailedStaging, Arn] RoleArn: !GetAtt [ StatesExecutionRole, Arn ] Parameters: FileProcessingFailureTopicName: Type: String Default: datalake-staging-failure Description: Please add a SNS topic name to receive failure notifications EnvironmentPrefix: Type: String Description: Enter the environment prefix used for the DataLake structure (S3 Buckets and DynamoDB tables MinLength: 3 MaxLength: 19 AllowedPattern: "[a-z][a-z0-9-]+" Metadata: 'AWS::CloudFormation::Interface': ParameterGroups: - Label: default: SNS Failure Notifications Parameters: - FileProcessingFailureTopicName