#Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. #SPDX-License-Identifier: MIT-0 AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > SAM Template for AWS Step Functions batch processing example Globals: Function: Timeout: 900 Parameters: SESSender: Type: String Default: "sender@example.com" Description: Specify the sender email address. SESRecipient: Type: String Default: "recipient@example.com" Description: Specify the recipient email address. SESIdentityName: Type: String Default: "sender@example.com" Description: An email address or domain that Amazon SES users use to send email. It is a best practice to authorize only specific email addresses such as in this case sender@example.com to send emails. If your SES Accounts are in sandbox you have to specify both the sender and recipient emails, in that case modify the template.yaml to add the permissions for recipient email address. InputArchiveFolder: Type: String Default: "input_archive" Description: Amazon S3 prefix in the SourceBucket where the input file will be archived after processing. FileChunkSize: Type: String Default: 600 Description: Size of each of the chunks, which is split from the input file. FileDelimiter: Type: String Default: "," Description: Delimiter of the CSV file (for example, a comma). Resources: BlogBatchProcessChunk: Type: AWS::Serverless::StateMachine Properties: DefinitionUri: source/statemachine/blog-sfn-process-chunk.json DefinitionSubstitutions: ReadFileFunctionArn: !GetAtt ReadFileFunction.Arn WriteOutputChunkFunctionArn: !GetAtt WriteOutputChunkFunction.Arn ValidateDataFunctionArn: !GetAtt ValidateDataFunction.Arn ApiEndpoint: !Sub "${Api}.execute-api.${AWS::Region}.amazonaws.com" ErrorTableName: !Ref ErrorTable Policies: - LambdaInvokePolicy: FunctionName: !Ref GetDataFunction - LambdaInvokePolicy: FunctionName: !Ref ReadFileFunction - LambdaInvokePolicy: FunctionName: !Ref WriteOutputChunkFunction - LambdaInvokePolicy: FunctionName: !Ref ValidateDataFunction - DynamoDBWritePolicy: TableName: !Ref ErrorTable - Statement: - Sid: AllowApiGatewayInvoke Effect: Allow Action: - execute-api:Invoke Resource: !Sub "arn:${AWS::Partition}:execute-api:${AWS::Region}:${AWS::AccountId}:${Api}/*/GET/financials/*" BlogBatchMainOrchestrator: Type: AWS::Serverless::StateMachine Properties: DefinitionUri: source/statemachine/blog-sfn-main-orchestrator.json DefinitionSubstitutions: SplitInputFileFunctionArn: !GetAtt SplitInputFileFunction.Arn MergeS3FilesFunctionArn: !GetAtt MergeS3FilesFunction.Arn SendEmailFunctionArn: !GetAtt SendEmailFunction.Arn SNSArn: !Ref SNSTopic SESSender: !Ref SESSender SESRecipient: !Ref SESRecipient BlogBatchProcessChunkArn: !GetAtt BlogBatchProcessChunk.Arn Policies: - LambdaInvokePolicy: FunctionName: !Ref SplitInputFileFunction - LambdaInvokePolicy: FunctionName: !Ref MergeS3FilesFunction - LambdaInvokePolicy: FunctionName: !Ref SendEmailFunction - SNSCrudPolicy: TopicName: !GetAtt SNSTopic.TopicName - StepFunctionsExecutionPolicy: StateMachineName: !GetAtt BlogBatchProcessChunk.Name - Statement: - Sid: AllowPutTargets Effect: Allow Action: - events:PutTargets - events:PutRule - events:DescribeRule Resource: !Sub "arn:${AWS::Partition}:events:${AWS::Region}:${AWS::AccountId}:rule/StepFunctionsGetEventsForStepFunctionsExecutionRule" - Sid: AllowStatesDescribeStop Effect: Allow Action: - states:DescribeExecution - states:StopExecution Resource: !Sub "arn:aws:states:${AWS::Region}:${AWS::AccountId}:execution:${BlogBatchProcessChunk.Name}:*" SplitInputFileFunction: Type: AWS::Serverless::Function Properties: CodeUri: source/split-ip-file/ Handler: app.lambda_handler Runtime: python3.8 Policies: - S3CrudPolicy: BucketName: !Ref SourceBucket SplitInputFileFunctionLogGroup: DependsOn: SplitInputFileFunction Type: AWS::Logs::LogGroup Properties: KmsKeyId: !GetAtt LogGroupKey.Arn LogGroupName: !Sub /aws/lambda/${SplitInputFileFunction} RetentionInDays: 7 MergeS3FilesFunction: Type: AWS::Serverless::Function Properties: CodeUri: source/merge-s3-files/ Handler: app.lambda_handler Runtime: python3.8 Policies: - S3ReadPolicy: BucketName: !Ref SourceBucket - S3WritePolicy: BucketName: !Ref SourceBucket MergeS3FilesFunctionLogGroup: DependsOn: MergeS3FilesFunction Type: AWS::Logs::LogGroup Properties: KmsKeyId: !GetAtt LogGroupKey.Arn LogGroupName: !Sub /aws/lambda/${MergeS3FilesFunction} RetentionInDays: 7 SendEmailFunction: Type: AWS::Serverless::Function Properties: CodeUri: source/send-email/ Handler: app.lambda_handler Runtime: python3.8 Policies: - SESCrudPolicy: IdentityName: !Ref SESIdentityName - S3ReadPolicy: BucketName: !Ref SourceBucket SendEmailFunctionLogGroup: DependsOn: SendEmailFunction Type: AWS::Logs::LogGroup Properties: KmsKeyId: !GetAtt LogGroupKey.Arn LogGroupName: !Sub /aws/lambda/${SendEmailFunction} RetentionInDays: 7 Api: Type: AWS::Serverless::Api DependsOn: ApiCWLRoleArn Properties: StageName: Prod Auth: DefaultAuthorizer: AWS_IAM UsagePlan: CreateUsagePlan: PER_API UsagePlanName: "blog-api-usage-plan" Quota: Limit: 100 Period: DAY Throttle: BurstLimit: 50 RateLimit: 100 Description: "Blog API Usage Plan" AccessLogSetting: DestinationArn: !Sub ${ApiAccessLogGroup.Arn} Format: "{ 'requestId':'$context.requestId', 'ip': '$context.identity.sourceIp', 'caller':'$context.identity.caller', 'user':'$context.identity.user','requestTime':'$context.requestTime', 'xrayTraceId':'$context.xrayTraceId', 'wafResponseCode':'$context.wafResponseCode', 'httpMethod':'$context.httpMethod','resourcePath':'$context.resourcePath', 'status':'$context.status','protocol':'$context.protocol', 'responseLength':'$context.responseLength' }" ApiAccessLogGroup: Type: AWS::Logs::LogGroup DependsOn: Api Properties: LogGroupName: !Sub /aws/apigateway/${Api} RetentionInDays: 7 KmsKeyId: !GetAtt LogGroupKey.Arn LogGroupKey: Type: AWS::KMS::Key Properties: Enabled: true EnableKeyRotation: true KeyPolicy: Version: 2012-10-17 Id: key-loggroup Statement: - Sid: Enable IAM User Permissions Effect: Allow Principal: AWS: !Join - '' - - !Sub 'arn:${AWS::Partition}:iam::' - !Ref 'AWS::AccountId' - ':root' Action: 'kms:*' Resource: '*' - Sid: Enable Cloudwatch access Effect: Allow Principal: Service: !Sub "logs.${AWS::Region}.amazonaws.com" Action: - kms:Encrypt* - kms:Decrypt* - kms:ReEncrypt* - kms:GenerateDataKey* - kms:Describe* Resource: '*' ApiCWLRoleArn: Type: AWS::ApiGateway::Account Properties: CloudWatchRoleArn: !GetAtt CloudWatchRole.Arn CloudWatchRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: Action: 'sts:AssumeRole' Effect: Allow Principal: Service: apigateway.amazonaws.com Path: / ManagedPolicyArns: - !Sub 'arn:${AWS::Partition}:iam::aws:policy/service-role/AmazonAPIGatewayPushToCloudWatchLogs' GetDataFunction: Type: AWS::Serverless::Function Properties: CodeUri: source/get-data/ Handler: app.lambda_handler Runtime: python3.8 Environment: Variables: TABLE_NAME: !Ref FinancialTable Policies: - AWSLambdaExecute - DynamoDBReadPolicy: TableName: !Ref FinancialTable Events: GetData: Type: Api Properties: RestApiId: !Ref Api Path: /financials/{uuid} Method: get GetDataFunctionLogGroup: DependsOn: GetDataFunction Type: AWS::Logs::LogGroup Properties: KmsKeyId: !GetAtt LogGroupKey.Arn LogGroupName: !Sub /aws/lambda/${GetDataFunction} RetentionInDays: 7 ReadFileFunction: Type: AWS::Serverless::Function Properties: CodeUri: source/read-file/ Handler: app.lambda_handler Runtime: python3.8 Policies: - S3ReadPolicy: BucketName: !Ref SourceBucket ReadFileFunctionLogGroup: DependsOn: ReadFileFunction Type: AWS::Logs::LogGroup Properties: KmsKeyId: !GetAtt LogGroupKey.Arn LogGroupName: !Sub /aws/lambda/${ReadFileFunction} RetentionInDays: 7 FinancialTable: Type: AWS::DynamoDB::Table Properties: PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true SSESpecification: SSEEnabled: true AttributeDefinitions: - AttributeName: uuid AttributeType: S KeySchema: - AttributeName: uuid KeyType: HASH BillingMode: PAY_PER_REQUEST ErrorTable: Type: AWS::DynamoDB::Table Properties: PointInTimeRecoverySpecification: PointInTimeRecoveryEnabled: true SSESpecification: SSEEnabled: true AttributeDefinitions: - AttributeName: uuid AttributeType: S KeySchema: - AttributeName: uuid KeyType: HASH BillingMode: PAY_PER_REQUEST WriteOutputChunkFunction: Type: AWS::Serverless::Function Properties: CodeUri: source/write-output-chunk/ Handler: app.lambda_handler Runtime: python3.8 Policies: - S3WritePolicy: BucketName: !Ref SourceBucket WriteOutputChunkFunctionLogGroup: DependsOn: WriteOutputChunkFunction Type: AWS::Logs::LogGroup Properties: KmsKeyId: !GetAtt LogGroupKey.Arn LogGroupName: !Sub /aws/lambda/${WriteOutputChunkFunction} RetentionInDays: 7 ValidateDataFunction: Type: AWS::Serverless::Function Properties: CodeUri: source/validate-data/ Handler: app.lambda_handler Runtime: python3.8 ValidateDataFunctionLogGroup: DependsOn: ValidateDataFunction Type: AWS::Logs::LogGroup Properties: KmsKeyId: !GetAtt LogGroupKey.Arn LogGroupName: !Sub /aws/lambda/${ValidateDataFunction} RetentionInDays: 7 SourceBucket: Type: AWS::S3::Bucket Properties: BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 LoggingConfiguration: DestinationBucketName: !Ref LoggingBucket VersioningConfiguration: Status: Enabled LoggingBucket: Type: 'AWS::S3::Bucket' Properties: AccessControl: LogDeliveryWrite BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 VersioningConfiguration: Status: Enabled S3NotificationLambdaFunction: Type: AWS::Serverless::Function Properties: CodeUri: source/s3-lambda-notification/ Handler: app.lambda_handler Runtime: python3.8 Policies: - StepFunctionsExecutionPolicy: StateMachineName: !GetAtt BlogBatchMainOrchestrator.Name Environment: Variables: STATE_MACHINE_EXECUTION_NAME: "BlogBatchMainOrchestrator" INPUT_ARCHIVE_FOLDER: !Ref InputArchiveFolder FILE_CHUNK_SIZE: !Ref FileChunkSize FILE_DELIMITER: !Ref FileDelimiter STATE_MACHINE_ARN: !GetAtt BlogBatchMainOrchestrator.Arn S3NotificationLambdaFunctionLogGroup: DependsOn: S3NotificationLambdaFunction Type: AWS::Logs::LogGroup Properties: KmsKeyId: !GetAtt LogGroupKey.Arn LogGroupName: !Sub /aws/lambda/${S3NotificationLambdaFunction} RetentionInDays: 7 S3BucketEventPermission: Type: AWS::Lambda::Permission Properties: Action: lambda:invokeFunction SourceAccount: !Ref 'AWS::AccountId' FunctionName: !Ref S3NotificationLambdaFunction SourceArn: !GetAtt SourceBucket.Arn Principal: s3.amazonaws.com PostStackProcessingFunctionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: sts:AssumeRole Path: / Policies: - PolicyName: S3BucketNotificationDynamoDBInsertPolicy PolicyDocument: Version: '2012-10-17' Statement: - Sid: AllowBucketNotification Effect: Allow Action: s3:PutBucketNotification Resource: - !Sub 'arn:${AWS::Partition}:s3:::${SourceBucket}' - !Sub 'arn:${AWS::Partition}:s3:::${SourceBucket}/*' - Sid: DynamoDBInsert Effect: Allow Action: dynamodb:BatchWriteItem Resource: - !GetAtt FinancialTable.Arn PostStackProcessingFunction: Type: AWS::Serverless::Function Properties: Description: Function to apply notification to the S3 bucket CodeUri: source/custom-resource/ Handler: app.lambda_handler Runtime: python3.8 Role: !GetAtt PostStackProcessingFunctionRole.Arn PostStackProcessingFunctionLogGroup: DependsOn: PostStackProcessingFunction Type: AWS::Logs::LogGroup Properties: KmsKeyId: !GetAtt LogGroupKey.Arn LogGroupName: !Sub /aws/lambda/${PostStackProcessingFunction} RetentionInDays: 7 PostStackProcessing: Type: Custom::PostStackProcessing Properties: ServiceToken: !GetAtt PostStackProcessingFunction.Arn S3Bucket: !Ref SourceBucket FunctionARN: !GetAtt S3NotificationLambdaFunction.Arn NotificationId: S3ObjectCreatedEvent FinancialTableName: !Ref FinancialTable SNSTopic: Type: AWS::SNS::Topic Properties: KmsMasterKeyId: alias/aws/sns Outputs: SourceBucketARN: Description: "ARN for the Source Bucket" Value: !GetAtt SourceBucket.Arn