AWSTemplateFormatVersion: 2010-09-09 Description: "CloudFormation Template to deploy an ETL orchestration pipeline by using Redshift Data Api, Step Function and AWS Lambda" Parameters: RedshiftClusterEndpoint: Description: Redshift cluster endpoint including port number and database name Type: String Default: redshift-cluster.xxxxxx.region.redshift.amazonaws.com:5439/dev DbUsername: Description: Redshift database user name which has access to run SQL Script. Type: String AllowedPattern: "([a-z])([a-z]|[0-9])*" Default: 'awsuser' ETLScriptS3Path: Description: S3 URI to store the SQL script files. Please note, this automation would grant full access on your SQL script files' S3 bucket. Type: String Default: 's3://YourS3Bucket/Key/' Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: Input Parameters Parameters: - RedshiftClusterEndpoint - DbUsername - ETLScriptS3Path Mappings: Script: # static values related to the Sales Data Pipeline. Config: SetupETLScript: 'setup_sales_data_pipeline.sql' ExecuteETLScript: 'run_sales_data_pipeline.sql' JobRunDate: '2021-06-26' Resources: LambdaRedshiftBatchDataAPIRole: Type: 'AWS::IAM::Role' Properties: Description : IAM Role for lambda to access Redshift and execute the Data API AssumeRolePolicyDocument: Statement: - Action: - 'sts:AssumeRole' Effect: Allow Principal: Service: - lambda.amazonaws.com Version: 2012-10-17 Path: / Policies: - PolicyName: RedshiftBatchDataApiPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'redshift-data:BatchExecuteStatement' - 'redshift-data:ListStatements' - 'redshift-data:GetStatementResult' - 'redshift-data:DescribeStatement' Resource: '*' - Effect: Allow Action: - 'redshift:GetClusterCredentials' Resource: - !Sub - arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:cluster:${SourceRedshiftClusterIdentifier} - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]]} - !Sub - arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbname:${SourceRedshiftClusterIdentifier}/${RedshiftDatabaseName} - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]],RedshiftDatabaseName: !Select [1, !Split ["/", !Ref RedshiftClusterEndpoint]]} - !Sub - arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:${SourceRedshiftClusterIdentifier}/${DbUsername} - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]]} - PolicyName: ETLScriptS3AccessPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' Resource: - !Sub - arn:aws:s3:::${ETLScriptS3Bucket}/* - {ETLScriptS3Bucket: !Select [2, !Split ["/", !Ref ETLScriptS3Path]]} - !Sub - arn:aws:s3:::${ETLScriptS3Bucket} - {ETLScriptS3Bucket: !Select [2, !Split ["/", !Ref ETLScriptS3Path]]} ManagedPolicyArns: - 'arn:aws:iam::aws:policy/AWSLambdaExecute' LambdaRedshiftBatchDataAPI: Type: 'AWS::Lambda::Function' Properties: FunctionName: redshift_batch_data_api Description : Lambda function to asynchronously call SQL statements with Amazon Redshift Data API. Handler: index.handler Runtime: python3.8 Timeout: 300 Role: !GetAtt LambdaRedshiftBatchDataAPIRole.Arn Environment: Variables: redshift_cluster_identifier: !Sub - ${SourceRedshiftClusterIdentifier} - {SourceRedshiftClusterIdentifier: !Select [0, !Split [".", !Ref RedshiftClusterEndpoint]]} redshift_database_name: !Sub - ${RedshiftDatabaseName} - {RedshiftDatabaseName: !Select [1, !Split ["/", !Ref RedshiftClusterEndpoint]]} redshift_database_user: !Ref DbUsername job_run_date: !FindInMap [Script, Config, JobRunDate] Code: ZipFile: | import os import sys import json from pathlib import Path from pip._internal import main # install latest version of boto3 main(['install', '-I', '-q', 'boto3', '--target', '/tmp/','--no-cache-dir', '--disable-pip-version-check']) sys.path.insert(0, '/tmp/') import boto3 redshift_client = boto3.client('redshift-data') s3_client = boto3.client('s3') def get_sql_script_from_s3(script_s3_path): bucket, key = script_s3_path.replace("s3://", "").split("/", 1) obj = s3_client.get_object(Bucket=bucket, Key=key) return obj['Body'].read().decode('utf-8') def split_sql_statement(script_s3_path, job_run_date): script = get_sql_script_from_s3(script_s3_path).format(job_run_date=job_run_date) sql_statements = list(filter(None, script.split(';'))) return sql_statements def run_batch_sql(redshift_config, sql_statements, statement_name): # execute the input SQL statement in the specified Amazon Redshift cluster api_response = redshift_client.batch_execute_statement(ClusterIdentifier=redshift_config["redshift_cluster_id"] ,Database=redshift_config["redshift_database"] ,DbUser=redshift_config["redshift_user"] ,Sqls=sql_statements ,StatementName=statement_name) # return the sql_batch_id (a universally unique identifier generated by Amazon Redshift Data API) sql_batch_id = api_response["Id"] return sql_batch_id def get_sql_status(sql_id): desc = redshift_client.describe_statement(Id=sql_id) status = desc["Status"] if status == "FAILED": raise Exception('SQL query failed:' + sql_id + ": " + desc["Error"]) return status.strip('"') def handler(event, context): # action to be taken by the lambda function. # Allowed values: [setup_sales_data_pipeline, run_sales_data_pipeline, get_sql_status"] action = event['Input'].get('action') try: if action in ('setup_sales_data_pipeline','run_sales_data_pipeline'): redshift_config = {} # Get Database Credentials # cluster identifier for the Amazon Redshift cluster redshift_config["redshift_cluster_id"] = os.getenv('redshift_cluster_identifier') # database name for the Amazon Redshift cluster redshift_config["redshift_database"] = os.getenv('redshift_database_name') # database user in the Amazon Redshift cluster with access to execute relevant SQL queries redshift_config["redshift_user"] = os.getenv('redshift_database_user') # Get details to run the script job_run_date = os.getenv('job_run_date') etl_script_s3_path = event['Input'].get('etl_script_s3_path') sql_statements = split_sql_statement(etl_script_s3_path, job_run_date) statement_name = Path(etl_script_s3_path).name # execute the input SQL statement in the specified Amazon Redshift cluster sql_batch_id = run_batch_sql(redshift_config, sql_statements, statement_name) api_response = {'sql_batch_id': sql_batch_id} elif action == "get_sql_status": # sql_batch_id to input for action get_sql_status sql_batch_id = event['Input'].get('sql_batch_id') # get status of a previously executed data api call api_status = get_sql_status(sql_batch_id) api_response = {'status': api_status} else: raise ValueError("Invalid Action: " + action) except NameError as error: raise NameError(error) except Exception as exception: error_message = "Encountered exeption on:" + action + ":" + str(exception) raise Exception(error_message) return api_response StateMachineExecutionRole: Type: 'AWS::IAM::Role' Properties: Description : IAM Role for the state machine in step function to allow invocation of the lambda function AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - !Sub 'states.${AWS::Region}.amazonaws.com' Action: 'sts:AssumeRole' Path: / Policies: - PolicyName: StatesExecutionPolicy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'lambda:InvokeFunction' Resource: - !GetAtt LambdaRedshiftBatchDataAPI.Arn - Effect: Allow Action: - 'lambda:InvokeFunction' Resource: - !Sub - ${LambdaRedshiftBatchDataAPIArn}:* - {LambdaRedshiftBatchDataAPIArn: !GetAtt LambdaRedshiftBatchDataAPI.Arn} RedshiftETLStepFunction: Type: 'AWS::StepFunctions::StateMachine' Properties: DefinitionString: !Sub - |- { "Comment": "A simple ETL workflow for loading dimension and fact tables", "StartAt": "setup_sales_data_pipeline", "States": { "setup_sales_data_pipeline": { "Comment": "Invoke lambda function", "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "TimeoutSeconds": 300, "HeartbeatSeconds": 60, "ResultPath": "$.sql_output", "ResultSelector": { "output.$": "$.Payload" }, "Parameters": { "FunctionName": "${BatchDataAPILambdaArn}:$LATEST", "Payload": { "Input": { "action": "setup_sales_data_pipeline", "etl_script_s3_path": "${ETLScriptS3Path}${SetupETLScriptFilename}" } } }, "Next": "wait_on_setup_sales_data_pipeline" }, "wait_on_setup_sales_data_pipeline": { "Comment": "Wait before status check", "Type": "Wait", "Seconds": 10, "Next": "setup_sales_data_pipeline_status_check" }, "setup_sales_data_pipeline_status_check": { "Comment": "Check Task Status", "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "TimeoutSeconds": 300, "HeartbeatSeconds": 60, "ResultPath": "$.step_output", "ResultSelector": { "output.$": "$.Payload" }, "Parameters": { "FunctionName": "${BatchDataAPILambdaArn}:$LATEST", "Payload": { "Input": { "action": "get_sql_status", "sql_batch_id.$": "$.sql_output.output.sql_batch_id" } } }, "Next": "is_setup_sales_data_pipeline_complete" }, "is_setup_sales_data_pipeline_complete": { "Comment": "check if previous step is complete", "Type": "Choice", "Choices": [ { "Variable": "$.step_output.output.status", "StringEquals": "FAILED", "Next": "sales_data_pipeline_failure" }, { "Variable": "$.step_output.output.status", "StringEquals": "FINISHED", "Next": "run_sales_data_pipeline" } ], "Default": "wait_on_setup_sales_data_pipeline" }, "run_sales_data_pipeline": { "Comment": "Invoke lambda function", "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "TimeoutSeconds": 300, "HeartbeatSeconds": 60, "ResultPath": "$.sql_output", "ResultSelector": { "output.$": "$.Payload" }, "Parameters": { "FunctionName": "${BatchDataAPILambdaArn}:$LATEST", "Payload": { "Input": { "action": "run_sales_data_pipeline", "etl_script_s3_path": "${ETLScriptS3Path}${ExecuteETLScriptFilename}" } } }, "Next": "wait_on_run_sales_data_pipeline" }, "wait_on_run_sales_data_pipeline": { "Comment": "Wait before status check", "Type": "Wait", "Seconds": 10, "Next": "run_sales_data_pipeline_status_check" }, "run_sales_data_pipeline_status_check": { "Comment": "Check Task Status", "Type": "Task", "Resource": "arn:aws:states:::lambda:invoke", "TimeoutSeconds": 300, "HeartbeatSeconds": 60, "ResultPath": "$.step_output", "ResultSelector": { "output.$": "$.Payload" }, "Parameters": { "FunctionName": "${BatchDataAPILambdaArn}:$LATEST", "Payload": { "Input": { "action": "get_sql_status", "sql_batch_id.$": "$.sql_output.output.sql_batch_id" } } }, "Next": "is_run_sales_data_pipeline_complete" }, "is_run_sales_data_pipeline_complete": { "Comment": "check if previous step is complete", "Type": "Choice", "Choices": [ { "Variable": "$.step_output.output.status", "StringEquals": "FAILED", "Next": "sales_data_pipeline_failure" }, { "Variable": "$.step_output.output.status", "StringEquals": "FINISHED", "Next": "sales_data_pipeline_success" } ], "Default": "wait_on_run_sales_data_pipeline" }, "sales_data_pipeline_failure": { "Type": "Fail", "Cause": "Failure on Sales Data Pipeline", "Error": "Error" }, "sales_data_pipeline_success": { "Type": "Pass", "Result": "sales_data_pipeline_success", "End": true } } } - BatchDataAPILambdaArn: !GetAtt LambdaRedshiftBatchDataAPI.Arn SetupETLScriptFilename: !FindInMap [Script, Config, SetupETLScript] ExecuteETLScriptFilename: !FindInMap [Script, Config, ExecuteETLScript] RoleArn: !GetAtt StateMachineExecutionRole.Arn Outputs: RedshiftETLStepFunctionArn: Description: "The ARN of the step function used for ETL orchestration" Value: Ref: RedshiftETLStepFunction LambdaRedshiftBatchDataAPI: Description: "The name of the Lambda Redshift Data API function" Value: Ref: LambdaRedshiftBatchDataAPI