# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. # # Permission is hereby granted, free of charge, to any person obtaining a copy of this # software and associated documentation files (the "Software"), to deal in the Software # without restriction, including without limitation the rights to use, copy, modify, # merge, publish, distribute, sublicense, and/or sell copies of the Software, and to # permit persons to whom the Software is furnished to do so. # # THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, # INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A # PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT # HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION # OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE # SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. AWSTemplateFormatVersion: "2010-09-09" Description: > This template manages sample AWS Step Functions resources to be orchestrate AWS Glue jobs and crawlers. Parameters: MarketingAndSalesDatabaseName: Type: String MinLength: "4" Default: "marketingandsales_qs" Description: "Name of the AWS Glue database." SalesPipelineTableName: Type: String MinLength: "4" Default: "salespipeline_qs" Description: "Name of the Sales Pipeline data table in AWS Glue." MarketingTableName: Type: String MinLength: "4" Default: "marketing_qs" Description: "Name of the Marketing data table in AWS Glue." GlueRunnerActivityName: Type: String Default: "GlueRunnerActivity" Description: "Name of the AWS Step Functions activity to be polled by GlueRunner." AthenaRunnerActivityName: Type: String Default: "AthenaRunnerActivity" Description: "Name of the AWS Step Functions activity to be polled by AthenaRunner." ArtifactBucketName: Type: String MinLength: "1" Description: "Name of the S3 bucket containing source .zip files. Bucket is NOT created by this CFT." LambdaSourceS3Key: Type: String MinLength: "1" Description: "Name of the S3 key of Glue Runner lambda function .zip file." DataBucketName: Type: String MinLength: "1" Description: "Name of the S3 bucket in which the source Marketing and Sales data will be uploaded. Bucket is created by this CFT." Resources: #IAM Resources StateExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - states.us-east-1.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: "StatesExecutionPolicy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "lambda:InvokeFunction" Resource: "*" Path: "/" # Activity resources GlueRunnerActivity: Type: "AWS::StepFunctions::Activity" Properties: Name: !Ref GlueRunnerActivityName AthenaRunnerActivity: Type: "AWS::StepFunctions::Activity" Properties: Name: !Ref AthenaRunnerActivityName WaitForSalesDataActivity: Type: "AWS::StepFunctions::Activity" Properties: Name: !Sub "${DataBucketName}-SalesPipeline_QuickSightSample.csv" WaitForMarketingDataActivity: Type: "AWS::StepFunctions::Activity" Properties: Name: !Sub "${DataBucketName}-MarketingData_QuickSightSample.csv" # State Machine resources # State machine for testing Athena Runner AthenaRunnerTestETLOrchestrator: Type: "AWS::StepFunctions::StateMachine" Properties: StateMachineName: AthenaRunnerTestETLOrchestrator DefinitionString: Fn::Sub: - |- { "StartAt": "Configure Athena Query", "States": { "Configure Athena Query":{ "Type": "Pass", "Result": "{ \"AthenaQueryString\" : \"SELECT * FROM ${GlueTableName} limit 10;\", \"AthenaDatabase\": \"${GlueDatabaseName}\", \"AthenaResultOutputLocation\": \"${AthenaResultOutputLocation}\", \"AthenaResultEncryptionOption\": \"${AthenaResultEncryptionOption}\"}", "Next": "Execute Athena Query" }, "Execute Athena Query":{ "Type": "Task", "Resource": "${AthenaRunnerActivityArn}", "End": true } } } - { GlueDatabaseName: !Ref MarketingAndSalesDatabaseName, GlueTableName: !Ref MarketingTableName, AthenaRunnerActivityArn: !Ref AthenaRunnerActivity, AthenaResultOutputLocation: !Sub "s3://${DataBucketName}/athena-runner-output/", AthenaResultEncryptionOption: "SSE_S3" } RoleArn: !GetAtt StateExecutionRole.Arn MarketingAndSalesETLOrchestrator: Type: "AWS::StepFunctions::StateMachine" Properties: StateMachineName: MarketingAndSalesETLOrchestrator DefinitionString: Fn::Sub: - |- { "Comment": "Marketing and Sales ETL Pipeline Orchestrator.", "StartAt": "Pre-process", "States": { "Pre-process": { "Type": "Pass", "Next": "Start Parallel Glue Jobs" }, "Start Parallel Glue Jobs": { "Type": "Parallel", "Branches": [ { "StartAt": "Wait For Sales Data", "States": { "Wait For Sales Data": { "Type": "Task", "Resource": "${WaitForSalesDataActivityArn}", "Next": "Prep for Process Sales Data" }, "Prep for Process Sales Data": { "Type": "Pass", "Result": "{\"GlueJobName\" : \"${ProcessSalesDataGlueJobName}\"}", "Next": "Process Sales Data" }, "Process Sales Data": { "Type": "Task", "Resource": "${GlueRunnerActivityArn}", "End": true } } }, { "StartAt": "Wait For Marketing Data", "States": { "Wait For Marketing Data": { "Type": "Task", "Resource": "${WaitForMarketingDataActivityArn}", "Next": "Prep for Process Marketing Data" }, "Prep for Process Marketing Data": { "Type": "Pass", "Result": "{\"GlueJobName\" : \"${ProcessMarketingDataGlueJobName}\"}", "Next": "Process Marketing Data" }, "Process Marketing Data": { "Type": "Task", "Resource": "${GlueRunnerActivityArn}", "End": true } } } ], "Next": "Prep For Joining Marketing And Sales Data", "Catch": [ { "ErrorEquals": ["GlueJobFailedError"], "Next": "ETL Job Failed Fallback" }] }, "Prep For Joining Marketing And Sales Data": { "Type": "Pass", "Result": "{\"GlueJobName\" : \"${JoinMarketingAndSalesDataJobName}\"}", "Next": "Join Marketing And Sales Data" }, "Join Marketing And Sales Data": { "Type": "Task", "Resource": "${GlueRunnerActivityArn}", "Catch": [ { "ErrorEquals": ["GlueJobFailedError"], "Next": "ETL Job Failed Fallback" }], "End": true }, "ETL Job Failed Fallback": { "Type": "Pass", "Result": "This is a fallback from an ETL job failure.", "End": true } } } - { GlueRunnerActivityArn : !Ref GlueRunnerActivity, WaitForSalesDataActivityArn: !Ref WaitForSalesDataActivity, WaitForMarketingDataActivityArn: !Ref WaitForMarketingDataActivity, ProcessSalesDataGlueJobName: "ProcessSalesData", ProcessMarketingDataGlueJobName: "ProcessMarketingData", JoinMarketingAndSalesDataJobName: "JoinMarketingAndSalesData" } RoleArn: !GetAtt StateExecutionRole.Arn # On S3 Object Created Lambda Functions resources OnS3ObjectCreatedLambdaExecutionRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole ManagedPolicyArns: - arn:aws:iam::aws:policy/AmazonS3FullAccess - arn:aws:iam::aws:policy/AmazonSNSFullAccess - arn:aws:iam::aws:policy/CloudWatchLogsFullAccess - arn:aws:iam::aws:policy/AWSStepFunctionsFullAccess Path: "/" OnS3ObjectCreatedPolicy: Type: "AWS::IAM::Policy" Properties: PolicyDocument: { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" }, { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "*" }, { "Effect": "Allow", "Action": [ "states:SendTaskSuccess", "states:SendTaskFailure", "states:SendTaskHeartbeat", "states:GetActivityTask" ], "Resource": "*" } ] } PolicyName: "OnS3ObjectCreatedPolicy" Roles: - !Ref OnS3ObjectCreatedLambdaExecutionRole OnS3ObjectCreatedLambdaFunction: Type: "AWS::Lambda::Function" Properties: FunctionName: "ons3objectcreated" Description: "Enables a Step Function State Machine to continue execution after a new object is created in S3." Handler: "ons3objectcreated.handler" Role: !GetAtt OnS3ObjectCreatedLambdaExecutionRole.Arn Code: S3Bucket: !Ref ArtifactBucketName S3Key: !Ref LambdaSourceS3Key Timeout: 180 #seconds MemorySize: 128 #MB Runtime: python2.7 DependsOn: - OnS3ObjectCreatedLambdaExecutionRole # For every bucket that needs to invoke OnS3ObjectCreated: DataBucket: Type: "AWS::S3::Bucket" Properties: BucketName: !Ref DataBucketName NotificationConfiguration: LambdaConfigurations: - Function: !GetAtt OnS3ObjectCreatedLambdaFunction.Arn Event: "s3:ObjectCreated:*" Filter: S3Key: Rules: - Name: "suffix" Value: "csv" DependsOn: - DataBucketPermission DataBucketPermission: Type: "AWS::Lambda::Permission" Properties: Action: 'lambda:InvokeFunction' FunctionName: !Ref OnS3ObjectCreatedLambdaFunction Principal: s3.amazonaws.com SourceAccount: !Ref "AWS::AccountId" SourceArn: !Sub "arn:aws:s3:::${DataBucketName}"