AWSTemplateFormatVersion: 2010-09-09 Description: "CloudFormation Template to deploy an ETL orchestration pipeline by using Redshift Data Api, Step Function and AWS Lambda" Parameters: RedshiftClusterEndpoint: Description: Redshift cluster endpoint including port number and database name Type: String Default: redshift-cluster.xxxxxx.region.redshift.amazonaws.com:5439/dev DbUsername: Description: Redshift database user name which has access to run SQL Script. Type: String AllowedPattern: "([a-z])([a-z]|[0-9])*" Default: 'awsuser' ETLScriptS3Path: Description: S3 bucket name (and folder if needed) to store your Stored Procedure and validation SQL script files. Please note, this automation would grant full access on your SQL script files' S3 bucket. Type: String Default: 'YourS3BucketName' Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: Input Parameters Parameters: - RedshiftClusterEndpoint - DbUsername - ETLScriptS3Path Mappings: Script: # static values related to the Sales Data Pipeline. Config: SetupScript: 'sp_statements.sql' ValidateScript: 'validate_sql_statement.sql' Resources: StateMachineExecutionRole: Type: 'AWS::IAM::Role' Properties: Description : IAM Role for the state machine in step function to run AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - !Sub 'states.${AWS::Region}.amazonaws.com' Action: 'sts:AssumeRole' Path: / Policies: - PolicyName: RedshiftBatchDataApiPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'redshift-data:BatchExecuteStatement' - 'redshift-data:ListStatements' - 'redshift-data:GetStatementResult' - 'redshift-data:DescribeStatement' - 'redshift-data:ExecuteStatement' Resource: '*' - Effect: Allow Action: - 'redshift:GetClusterCredentials' Resource: - !Sub - arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:cluster:${SourceRedshiftClusterIdentifier} - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]]} - !Sub - arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbname:${SourceRedshiftClusterIdentifier}/${RedshiftDatabaseName} - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]],RedshiftDatabaseName: !Select [1, !Split ["/", !Ref RedshiftClusterEndpoint]]} - !Sub - arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:${SourceRedshiftClusterIdentifier}/${DbUsername} - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]]} - PolicyName: ScriptS3AccessPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' Resource: - !Sub - arn:aws:s3:::${ETLScriptS3Bucket}/* - {ETLScriptS3Bucket: !Select [0, !Split ["/", !Ref ETLScriptS3Path]]} - !Sub - arn:aws:s3:::${ETLScriptS3Bucket} - {ETLScriptS3Bucket: !Select [0, !Split ["/", !Ref ETLScriptS3Path]]} RedshiftETLStepFunction: Type: 'AWS::StepFunctions::StateMachine' Properties: DefinitionString: !Sub - |- { "Comment": "A simple ETL workflow for loading dimension and fact tables", "StartAt": "read_sp_statement", "States": { "read_sp_statement": { "Type": "Task", "Parameters": { "Bucket": "${S3BucketName}", "Key": "${SetupScriptFilename}" }, "ResultPath": "$.sql_output", "ResultSelector": { "sql_output.$": "$.Body" }, "Resource": "arn:aws:states:::aws-sdk:s3:getObject", "Next": "run_sp_deploy_redshift" }, "run_sp_deploy_redshift": { "Comment": "Deploy SP on Redshift cluster", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:batchExecuteStatement", "ResultPath": "$.sql_output", "Parameters": { "ClusterIdentifier": "${RedshiftClusterIdentifier}", "Database": "${RedshiftDbName}", "Sqls.$": "States.Array($.sql_output.sql_output)", "DbUser": "${RedshiftDbUser}" }, "Next": "wait_on_sp_deploy_redshift" }, "wait_on_sp_deploy_redshift": { "Comment": "Wait before status check", "Type": "Wait", "Seconds": 5, "Next": "run_sp_deploy_redshift_status_check" }, "run_sp_deploy_redshift_status_check": { "Comment": "Check Task Status", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement", "ResultPath": "$.sql_output", "Parameters": { "Id.$": "$.sql_output.Id" }, "Next": "is_run_sp_deploy_complete" }, "is_run_sp_deploy_complete": { "Comment": "check if run_sp_deploy step is complete", "Type": "Choice", "Choices": [ { "Variable": "$.sql_output.Status", "StringEquals": "FAILED", "Next": "sales_data_pipeline_failure" }, { "Variable": "$.sql_output.Status", "StringEquals": "FINISHED", "Next": "setup_sales_data_pipeline" } ], "Default": "wait_on_sp_deploy_redshift" }, "setup_sales_data_pipeline": { "Comment": "Invoke redshift batch execute", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement", "ResultPath": "$.sql_output", "Parameters": { "ClusterIdentifier": "${RedshiftClusterIdentifier}", "Database": "${RedshiftDbName}", "Sql": "call public.sp_setup_sales_data_pipeline()", "DbUser": "${RedshiftDbUser}" }, "Next": "wait_on_setup_sales_data_pipeline" }, "wait_on_setup_sales_data_pipeline": { "Comment": "Wait before status check", "Type": "Wait", "Seconds": 5, "Next": "setup_sales_data_pipeline_status_check" }, "setup_sales_data_pipeline_status_check": { "Comment": "Check Task Status", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement", "ResultPath": "$.sql_output", "Parameters": { "Id.$": "$.sql_output.Id" }, "Next": "is_setup_sales_data_pipeline_complete" }, "is_setup_sales_data_pipeline_complete": { "Comment": "check if setup_sales_data_pipeline step is complete", "Type": "Choice", "Choices": [ { "Variable": "$.sql_output.Status", "StringEquals": "FAILED", "Next": "sales_data_pipeline_failure" }, { "Variable": "$.sql_output.Status", "StringEquals": "FINISHED", "Next": "run_sales_data_pipeline" } ], "Default": "wait_on_setup_sales_data_pipeline" }, "run_sales_data_pipeline": { "Comment": "Load 2 dimension tables", "Type": "Parallel", "ResultPath": "$.sql_output", "Next": "run_load_fact_sales", "Branches": [ { "StartAt": "LoadItemTable", "States": { "LoadItemTable": { "Comment": "Load Item Table", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement", "ResultPath": "$.sql_output", "Parameters": { "ClusterIdentifier": "${RedshiftClusterIdentifier}", "Database": "${RedshiftDbName}", "Sql": "call public.sp_load_dim_item()", "DbUser": "${RedshiftDbUser}" }, "Next": "wait_on_load_item_table" }, "wait_on_load_item_table": { "Comment": "Wait before status check", "Type": "Wait", "Seconds": 5, "Next": "load_item_table_status_check" }, "load_item_table_status_check": { "Comment": "Check Task Status", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement", "ResultPath": "$.sql_output", "Parameters": { "Id.$": "$.sql_output.Id" }, "Next": "is_load_item_table_complete" }, "is_load_item_table_complete": { "Comment": "check if load_item_table_complete step is complete", "Type": "Choice", "Choices": [ { "Variable": "$.sql_output.Status", "StringEquals": "FAILED", "Next": "load_item_table_failed" }, { "Variable": "$.sql_output.Status", "StringEquals": "FINISHED", "Next": "load_item_table_success" } ], "Default": "wait_on_load_item_table" }, "load_item_table_success": { "Type": "Pass", "Result": "load_fact_sales_pipeline_success", "End": true }, "load_item_table_failed": { "Type": "Fail", "Cause": "Failure on load item table", "Error": "Error" } } }, { "StartAt": "LoadCustomerAddressTable", "States": { "LoadCustomerAddressTable": { "Comment": "Load Customer Address Table", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement", "ResultPath": "$.sql_output", "Parameters": { "ClusterIdentifier": "${RedshiftClusterIdentifier}", "Database": "${RedshiftDbName}", "Sql": "call public.sp_load_dim_customer_address()", "DbUser": "${RedshiftDbUser}" }, "Next": "wait_on_load_customer_address_table" }, "wait_on_load_customer_address_table": { "Comment": "Wait before status check", "Type": "Wait", "Seconds": 5, "Next": "load_customer_address_table_status_check" }, "load_customer_address_table_status_check": { "Comment": "Check Task Status", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement", "ResultPath": "$.sql_output", "Parameters": { "Id.$": "$.sql_output.Id" }, "Next": "is_load_customer_address_table_complete" }, "is_load_customer_address_table_complete": { "Comment": "check if load_customer_address_table step is complete", "Type": "Choice", "Choices": [ { "Variable": "$.sql_output.Status", "StringEquals": "FAILED", "Next": "load_customer_address_table_failed" }, { "Variable": "$.sql_output.Status", "StringEquals": "FINISHED", "Next": "load_customer_address_table_success" } ], "Default": "wait_on_load_customer_address_table" }, "load_customer_address_table_success": { "Type": "Pass", "Result": "load_customer_address_table_success", "End": true }, "load_customer_address_table_failed": { "Type": "Fail", "Cause": "Failure on load customer_address table", "Error": "Error" } } } ] }, "run_load_fact_sales": { "Comment": "Load final fact table", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:executeStatement", "ResultPath": "$.sql_output", "Parameters": { "ClusterIdentifier": "${RedshiftClusterIdentifier}", "Database": "${RedshiftDbName}", "Sql": "call public.sp_load_fact_sales(trunc(sysdate))", "DbUser": "${RedshiftDbUser}" }, "Next": "wait_on_run_load_fact_sales_pipeline" }, "wait_on_run_load_fact_sales_pipeline": { "Comment": "Wait before status check", "Type": "Wait", "Seconds": 5, "Next": "run_load_fact_sales_status_check" }, "run_load_fact_sales_status_check": { "Comment": "Check Task Status", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement", "ResultPath": "$.sql_output", "Parameters": { "Id.$": "$.sql_output.Id" }, "Next": "is_run_load_fact_sales_pipeline_complete" }, "is_run_load_fact_sales_pipeline_complete": { "Comment": "check if load_fact_sales step is complete", "Type": "Choice", "Choices": [ { "Variable": "$.sql_output.Status", "StringEquals": "FAILED", "Next": "sales_data_pipeline_failure" }, { "Variable": "$.sql_output.Status", "StringEquals": "FINISHED", "Next": "read_validate_fact_sales_sql_statement" } ], "Default": "wait_on_run_load_fact_sales_pipeline" }, "sales_data_pipeline_failure": { "Type": "Fail", "Cause": "Failure on Sales Data Pipeline", "Error": "Error" }, "read_validate_fact_sales_sql_statement": { "Type": "Task", "Parameters": { "Bucket": "${S3BucketName}", "Key": "${ValidateScriptFilename}" }, "ResultPath": "$.sql_output", "ResultSelector": { "sql_output.$": "$.Body" }, "Resource": "arn:aws:states:::aws-sdk:s3:getObject", "Next": "run_validate_fact_sales" }, "run_validate_fact_sales": { "Comment": "Validate validate_fact_sales", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:batchExecuteStatement", "ResultPath": "$.sql_output", "Parameters": { "ClusterIdentifier": "${RedshiftClusterIdentifier}", "Database": "${RedshiftDbName}", "Sqls.$": "States.Array($.sql_output.sql_output)", "DbUser": "${RedshiftDbUser}" }, "Next": "wait_on_run_validate_fact_sales" }, "wait_on_run_validate_fact_sales": { "Comment": "Wait before status check", "Type": "Wait", "Seconds": 5, "Next": "run_validate_fact_sales_status_check" }, "run_validate_fact_sales_status_check": { "Comment": "Check Task Status", "Type": "Task", "Resource": "arn:aws:states:::aws-sdk:redshiftdata:describeStatement", "ResultPath": "$.sql_output", "Parameters": { "Id.$": "$.sql_output.Id" }, "Next": "is_validate_fact_sales_complete" }, "is_validate_fact_sales_complete": { "Comment": "check if validate_fact_sales step is complete", "Type": "Choice", "Choices": [ { "Variable": "$.sql_output.Status", "StringEquals": "FAILED", "Next": "sales_data_pipeline_failure" }, { "Variable": "$.sql_output.Status", "StringEquals": "FINISHED", "Next": "validate_fact_sales_complete" } ], "Default": "wait_on_run_validate_fact_sales" }, "validate_fact_sales_complete": { "Type": "Pass", "Result": "load_fact_sales_pipeline_success", "End": true } } } - RedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]] RedshiftDbName: !Select [1, !Split ["/", !Ref RedshiftClusterEndpoint]] RedshiftDbUser: !Ref DbUsername S3BucketName: !Ref ETLScriptS3Path SetupScriptFilename: !FindInMap [Script, Config, SetupScript] ValidateScriptFilename: !FindInMap [Script, Config, ValidateScript] RoleArn: !GetAtt StateMachineExecutionRole.Arn LambdaInvokeStepFunctionRole: Type: AWS::IAM::Role Properties: RoleName: !Sub "LambdaInvokeStepFunctionRole-${AWS::AccountId}" Description: IAM Role for lambda to execute the Step Function AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: LambdaInvokePolicy PolicyDocument : Version: 2012-10-17 Statement: - Effect: "Allow" Action: - states:StartExecution Resource: !Ref RedshiftETLStepFunction - PolicyName: LambdaCloudFormationPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - s3:* Resource: - !Sub "arn:aws:s3:::cloudformation-custom-resource-response-${AWS::Region}" - !Sub "arn:aws:s3:::cloudformation-waitcondition-${AWS::Region}" - !Sub "arn:aws:s3:::cloudformation-custom-resource-response-${AWS::Region}/*" - !Sub "arn:aws:s3:::cloudformation-waitcondition-${AWS::Region}/*" LambdaInvokeStepFunction: Type: AWS::Lambda::Function Properties: FunctionName: !Sub "LambdaInvokeStepFunction-${AWS::AccountId}" Description: Lambda to execute the step function Handler: index.handler Runtime: python3.8 Role: !GetAtt 'LambdaInvokeStepFunctionRole.Arn' Timeout: 60 Code: ZipFile: | import boto3 import traceback import json import cfnresponse def handler(event, context): print(event) step_function_client = boto3.client('stepfunctions') res = {} if event['RequestType'] != 'Delete': try: step_function_input = {"comment": "Execute ETL Workflow for Redshift"} response = step_function_client.start_execution(stateMachineArn=event['ResourceProperties'].get('StepFunctionArn'), input=json.dumps(step_function_input) ) print(response) except: print(traceback.format_exc()) cfnresponse.send(event, context, cfnresponse.FAILED, input) raise cfnresponse.send(event, context, cfnresponse.SUCCESS, res) StartStepFunction: Type: Custom::LambdaStartStepFunction Properties: ServiceToken: !GetAtt [LambdaInvokeStepFunction, Arn] StepFunctionArn: !Ref RedshiftETLStepFunction Outputs: RedshiftETLStepFunctionArn: Description: "The ARN of the step function used for ETL orchestration" Value: Ref: RedshiftETLStepFunction