AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: ETL for OpenAQ data Parameters: TargetBucketName: Type: String AllowedPattern: '^[a-zA-Z0-9.\-_]{1,255}$' Description: Name of the S3 bucket to which transformed data will be uploaded. ChunkSize: Type: Number Default: 24 Description: Batch size specifying number of files to be processed by one invocation of Lambda. MapConcurrency: Type: Number Default: 3 Description: Maximum number of parallel executions of the transform phase in Step Function. LogLevel: Type: String Default: "INFO" AllowedValues: - "INFO" - "DEBUG" NotificationEmailAddress: Type: String AllowedPattern: '[^\s@]+@[^\s@]+\.[^\s@]+' ConstraintDescription: Enter a valid email address Resources: # Lambda functions InitializerFunction: Type: AWS::Serverless::Function Properties: Description: Download OpenAQ data files CodeUri: src/initializer Handler: initializer.lambda_handler Runtime: python3.8 MemorySize: 128 Timeout: 3 Policies: - AWSLambdaExecute # Managed Policy - Version: '2012-10-17' # Policy Document Statement: - Effect: Allow Action: - s3:GetObject - s3:ListBucket Resource: - !Sub arn:${AWS::Partition}:s3:::openaq-fetches/* - !Sub arn:${AWS::Partition}:s3:::openaq-fetches - Version: '2012-10-17' # x-ray policy Statement: - Effect: Allow Action: - xray:PutTraceSegments - xray:PutTelemetryRecords - xray:GetSamplingRules - xray:GetSamplingTargets - xray:GetSamplingStatisticSummaries Resource: '*' Environment: Variables: SOURCE_BUCKET: openaq-fetches PREFIX: realtime-gzipped CHUNK_SIZE: !Ref ChunkSize LOG_LEVEL: !Ref LogLevel Tracing: Active MapperFunction: Type: AWS::Serverless::Function Properties: Description: Process OpenAQ data files CodeUri: src/mapper Handler: mapper.lambda_handler Runtime: python3.8 MemorySize: 1000 Timeout: 60 Policies: - AWSLambdaExecute # Managed Policy - Version: '2012-10-17' # s3 Policy Statement: - Effect: Allow Action: - s3:GetObject - s3:ListBucket Resource: - !Sub arn:${AWS::Partition}:s3:::openaq-fetches/* - !Sub arn:${AWS::Partition}:s3:::openaq-fetches - Effect: "Allow" Action: - s3:GetObject - s3:PutObject Resource: !Sub arn:${AWS::Partition}:s3:::${TargetBucketName}/* - Version: '2012-10-17' # x-ray policy Statement: - Effect: Allow Action: - xray:PutTraceSegments - xray:PutTelemetryRecords - xray:GetSamplingRules - xray:GetSamplingTargets - xray:GetSamplingStatisticSummaries Resource: '*' Environment: Variables: SOURCE_BUCKET: openaq-fetches OUTPUT_BUCKET: !Ref TargetBucketName LOG_LEVEL: !Ref LogLevel Tracing: Active ReducerFunction: Type: AWS::Serverless::Function Properties: Description: Aggregate air quality data CodeUri: src/reducer Handler: reducer.lambda_handler Runtime: python3.8 MemorySize: 1000 Timeout: 15 Policies: - AWSLambdaExecute # Managed Policy - Version: '2012-10-17' # Policy Document Statement: - Effect: "Allow" Action: - s3:GetObject - s3:PutObject Resource: !Sub arn:${AWS::Partition}:s3:::${TargetBucketName}/* - Version: '2012-10-17' # x-ray policy Statement: - Effect: Allow Action: - xray:PutTraceSegments - xray:PutTelemetryRecords - xray:GetSamplingRules - xray:GetSamplingTargets - xray:GetSamplingStatisticSummaries Resource: '*' Environment: Variables: OUTPUT_BUCKET: !Ref TargetBucketName LOG_LEVEL: !Ref LogLevel Tracing: Active CleanupFunction: Type: AWS::Serverless::Function Properties: Description: Delete interemediate result files CodeUri: src/cleanup Handler: cleanup.lambda_handler Runtime: python3.8 MemorySize: 128 Timeout: 15 Policies: - AWSLambdaExecute # Managed Policy - Version: '2012-10-17' # Policy Document Statement: - Effect: "Allow" Action: - s3:DeleteObject Resource: !Sub arn:${AWS::Partition}:s3:::${TargetBucketName}/* - Version: '2012-10-17' # x-ray policy Statement: - Effect: Allow Action: - xray:PutTraceSegments - xray:PutTelemetryRecords - xray:GetSamplingRules - xray:GetSamplingTargets - xray:GetSamplingStatisticSummaries Resource: '*' Environment: Variables: OUTPUT_BUCKET: !Ref TargetBucketName LOG_LEVEL: !Ref LogLevel Tracing: Active # Notification topic NotificationTopic: Type: AWS::SNS::Topic Properties: Subscription: - Protocol: email Endpoint: !Ref NotificationEmailAddress # Step function role StateExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - !Sub states.${AWS::Region}.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: "InvokeLambda" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - lambda:InvokeFunction Resource: - !GetAtt InitializerFunction.Arn - !GetAtt MapperFunction.Arn - !GetAtt ReducerFunction.Arn - !GetAtt CleanupFunction.Arn - PolicyName: "PublishSNSPolicy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - sns:Publish Resource: !Ref NotificationTopic - PolicyName: "CWLogsPolicy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - logs:CreateLogDelivery - logs:GetLogDelivery - logs:UpdateLogDelivery - logs:DeleteLogDelivery - logs:ListLogDeliveries - logs:PutResourcePolicy - logs:DescribeResourcePolicies - logs:DescribeLogGroups Resource: "*" # Step function definition ETLOrchestrator: Type: AWS::StepFunctions::StateMachine Properties: DefinitionString: Fn::Sub: - |- { "StartAt": "GetFileInventory", "States": { "GetFileInventory":{ "Type": "Task", "Resource": "${InitializerFunction}", "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "Notify" }], "Next": "ProcessAllFiles" }, "ProcessAllFiles":{ "Type": "Map", "InputPath": "$", "ItemsPath": "$.chunks", "MaxConcurrency": 3, "Iterator": { "StartAt": "ProcessData", "States": { "ProcessData": { "Type": "Task", "Resource": "${MapperFunction}", "End": true } } }, "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "Notify" }], "Next": "WaitTenSeconds" }, "WaitTenSeconds": { "Type": "Wait", "Seconds": 10, "Next": "AggregateResults" }, "AggregateResults":{ "Type": "Task", "Resource": "${ReducerFunction}", "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "CleanupIntermediateResults" }], "Next": "CleanupIntermediateResults" }, "CleanupIntermediateResults":{ "Type": "Task", "Resource": "${CleanupFunction}", "Catch": [{ "ErrorEquals": ["States.ALL"], "Next": "Notify" }], "Next": "Notify" }, "Notify":{ "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${TopicName}", "Message.$": "$" }, "End": true } } } - { InitializerFunction: !GetAtt InitializerFunction.Arn, MapperFunction: !GetAtt MapperFunction.Arn, ReducerFunction: !GetAtt ReducerFunction.Arn, CleanupFunction: !GetAtt CleanupFunction.Arn, TopicName: !Ref NotificationTopic } RoleArn: !GetAtt StateExecutionRole.Arn LoggingConfiguration: Destinations: - CloudWatchLogsLogGroup: LogGroupArn: !GetAtt ETLOrchestratorLogGroup.Arn IncludeExecutionData: true Level: ERROR ETLOrchestratorLogGroup: Type: AWS::Logs::LogGroup Properties: LogGroupName: ETLOrchestrator RetentionInDays: 7 Outputs: ETLOrchestrator: Value: !Ref ETLOrchestrator InitializerFunction: Value: !Ref InitializerFunction MapperFunction: Value: !Ref MapperFunction ReducerFunction: Value: !Ref ReducerFunction CleanupFunction: Value: !Ref CleanupFunction NotificationTopic: Value: !Ref NotificationTopic