AWSTemplateFormatVersion: "2010-09-09" Description: "CloudFormation Template to deploy an Serverless Data processing pipelines by using Redshift Data Api, Event Bridge and AWS Lambda" Parameters: RedshiftClusterIdentifier: Description: Cluster Identifier for your redshift cluster Type: String Default: 'YourRedshiftClusterIdentiifer' DbUsername: Description: Redshift database user name which has access to run SQL Script. Type: String AllowedPattern: "([a-z])([a-z]|[0-9])*" Default: 'demo' DatabaseName: Description: Name of the Redshift database where SQL Script would be run. Type: String Default: 'dev' RedshiftIAMRoleName: Description: AWS IAM Role Name associated with the Redshift cluster Type: String Default: 'YourClusterRoleName' RedshiftIAMRoleARN: Description: AWS IAM Role Name associated with the Redshift cluster Type: String Default: 'YourClusterRoleARN' NotificationEmailId: Type: String Description: EmailId to send event notifications through Amazon SNS AllowedPattern: '^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$' ConstraintDescription: provide a valid email address. Default: 'test@test.com' ExecutionSchedule: Type: String Description: Cron expression to schedule extract-load-transform (ELT) process through EventBridge rule Default: 'cron(0 11 ? * MON-FRI *)' SqlText: Type: String Description: SQL Text to be run as part of the extract-load-transform (ELT) process Default: 'call run_elt_process();' s3bucketURI: Type: String Description: S3 bucket URI where data from redshift unload command be written to. Make sure IAM Role that is associated Redshift cluster has access to this bucket Default: 's3://YourS3BucketURI/' Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "Input Parameters" Parameters: - RedshiftClusterIdentifier - DbUsername - DatabaseName - RedshiftIAMRoleName - NotificationEmailId - ExecutionSchedule - SqlText - s3bukectURI - RedshiftIAMRoleARN Resources: RedshiftNotificationTopicSNS: Type: AWS::SNS::Topic Properties: KmsMasterKeyId: alias/aws/sns Subscription: - Endpoint: !Ref NotificationEmailId Protocol: email LambdaRedshiftDataApiETLRole: Type: AWS::IAM::Role Properties: Description : IAM Role for lambda to access Redshift and SNS topic ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: RedshiftAccessPolicy PolicyDocument : Version: 2012-10-17 Statement: - Effect: Allow Action: redshift:GetClusterCredentials Resource: - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:cluster:${RedshiftClusterIdentifier} - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbname:${RedshiftClusterIdentifier}/${DatabaseName} - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:${RedshiftClusterIdentifier}/${DbUsername} - Effect: "Allow" Action: - redshift-data:ExecuteStatement - redshift-data:ListStatements - redshift-data:GetStatementResult - redshift-data:DescribeStatement Resource: "*" - PolicyName: SNSPublishPolicy PolicyDocument : Version: 2012-10-17 Statement: - Effect: Allow Action: sns:Publish Resource: !Ref RedshiftNotificationTopicSNS EventBridgeScheduledTriggerETL1: Type: "AWS::Events::Rule" Properties: Description: Scheduled Event Rule to be triggered periodically based on cron expression. Triggers ETL stored procedure - LambdaRedshiftDataApiETL Lambda. 1st Eventrule ScheduleExpression: !Ref ExecutionSchedule State: "ENABLED" Targets: - Arn: Fn::GetAtt: - "LambdaRedshiftDataApiETL" - "Arn" Id: EventBridgeScheduledTriggerETL1 Input: !Sub "{\"Input\":{\"redshift_cluster_id\":\"${RedshiftClusterIdentifier}\",\"redshift_database\":\"${DatabaseName}\",\"redshift_user\":\"${DbUsername}\",\"action\":\"run_sql\",\"sql_text\":\"${SqlText}\",\"sns_topic_arn\":\"${RedshiftNotificationTopicSNS}\"}}" PermissionForScheduledEventToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "LambdaRedshiftDataApiETL" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "EventBridgeScheduledTriggerETL1" - "Arn" EventRedshiftDataApiTriggerUnload2: Type: "AWS::Events::Rule" Description: Redshift Event Rule to automatically capture Redshift stored procedure completion event and trigger Lambda function LambdaRedshiftDataApiUnload (3rd Lambda) to unload processed data. 2nd Event rule Properties: EventPattern: !Sub "{\"source\": [\"aws.redshift-data\"],\"detail\": {\"principal\": [\"arn:aws:sts::${AWS::AccountId}:assumed-role/${LambdaRedshiftDataApiETLRole}/${LambdaRedshiftDataApiETL}\"]}}" Description: Respond to Redshift-data ETL events. 2nd Event Rule. State: "ENABLED" Targets: - Arn: !GetAtt 'LambdaRedshiftDataApiUnload.Arn' Id: EventRedshiftDataApiTriggerUnload2 Input: !Sub "{\"Input\":{\"redshift_cluster_id\":\"${RedshiftClusterIdentifier}\",\"redshift_database\":\"${DatabaseName}\",\"redshift_user\":\"${DbUsername}\",\"action\":\"run_sql\",\"s3bucketname\":\"${s3bucketURI}\",\"redshift_iam_role\":\"${RedshiftIAMRoleARN}\"}}" PermissionForEventRedshiftDataApiTriggerUnload2: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "LambdaRedshiftDataApiUnload" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "EventRedshiftDataApiTriggerUnload2" - "Arn" EventRedshiftDataApiTriggerNotify3: Type: "AWS::Events::Rule" Description: Redshift Event Rule to automatically capture Redshift stored procedure completion event and trigger Lambda function LambdaRedshiftDataApiUnload (3rd Lambda) to unload processed data. 3rd event rule Properties: EventPattern: !Sub "{\"source\": [\"aws.redshift-data\"],\"detail\": {\"principal\": [\"arn:aws:sts::${AWS::AccountId}:assumed-role/${LambdaRedshiftDataApiETLRole}/${LambdaRedshiftDataApiUnload}\"]}}" Description: Respond to Redshift-data Unload Event. 3rd Event Rule. State: "ENABLED" Targets: - Arn: !GetAtt 'LambdaRedshiftDataApiUnload.Arn' Id: EventRedshiftDataApiTriggerNotify3 InputTransformer: InputPathsMap: {"body":"$.detail"} InputTemplate: !Sub "{\"Input\":{\"redshift_cluster_id\":\"${RedshiftClusterIdentifier}\",\"redshift_database\":\"${DatabaseName}\",\"redshift_user\":\"${DbUsername}\",\"action\":\"notify\",\"subject\":\"Unload of processsed data completed in Amazon Redshift\",\"body\":,\"sns_topic_arn\":\"${RedshiftNotificationTopicSNS}\"}}" PermissionForEventRedshiftDataApiTriggerNotify3: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "LambdaRedshiftDataApiUnload" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "EventRedshiftDataApiTriggerNotify3" - "Arn" LambdaRedshiftDataApiETL: DependsOn: - LambdaRedshiftDataApiETLRole Type: AWS::Lambda::Function Properties: Description: Lambda function to asynchronous call stored procedure with Amazon Redshift Data API. Lambda function processed loads and process raw data. 2nd Function. Handler: index.handler Runtime: python3.7 Role: !GetAtt 'LambdaRedshiftDataApiETLRole.Arn' Timeout: 30 Code: ZipFile: | import json import time import unicodedata import traceback import sys from pip._internal import main # install latest version of boto3 main(['install', '-I', '-q', 'boto3', '--target', '/tmp/', '--no-cache-dir', '--disable-pip-version-check']) sys.path.insert(0,'/tmp/') import boto3 # initiate redshift-data client in boto3 client = boto3.client("redshift-data") def handler(event, context): print(event) # input parameters passed from the caller event # cluster identifier for the Amazon Redshift cluster redshift_cluster_id = event['Input'].get('redshift_cluster_id') # database name for the Amazon Redshift cluster redshift_database = event['Input'].get('redshift_database') # database user in the Amazon Redshift cluster with access to run relevant SQL queries redshift_user = event['Input'].get('redshift_user') # Amazon SNS topic to be used to publish notifications to end-users sns_topic_arn = event['Input'].get('sns_topic_arn') # action to be taken by the lambda function. Allowed values: [run_sql, status_check, notify] action = event['Input'].get('action') # sql text to be run. e.g. call my_stored_procedure(input_params) sql_text = event['Input'].get('sql_text') # subject line to be used while publishing message through Amazon SNS subject = event['Input'].get('subject') # detailed body to be used while publishing message through Amazon SNS body = event['Input'].get('body') # query_id to input for action status_check query_id = event['Input'].get('query_id') try: if action == 'run_sql': # run the input SQL statement in the specified Amazon Redshift cluster res = run_sql(client, sql_text, redshift_database, redshift_user, redshift_cluster_id) elif action == "status_check": # check status of a previously run query res = status_check(client, query_id) elif action == "notify": # notify to end-users by publishing message in Amazon SNS service res = notify(sns_topic_arn, subject, body) else: raise ValueError("Invalid Action: " + action) except Exception as e: subject = "Error:" + action + ":" + str(e) body = traceback.format_exc() notify(sns_topic_arn, subject, body) raise return {'statusCode': 200, 'body': json.dumps(res)} def run_sql(client, sql_text, redshift_database, redshift_user, redshift_cluster_id, with_event=True): MAX_WAIT_CYCLES = 20 attempts = 0 print("Executing: {}".format(sql_text)) res = client.execute_statement(Database=redshift_database, DbUser=redshift_user, Sql=sql_text, ClusterIdentifier=redshift_cluster_id, WithEvent=with_event) print(res) query_id = res["Id"] while attempts < MAX_WAIT_CYCLES: attempts += 1 time.sleep(1) status = status_check(client, query_id) if status in ("STARTED", "FAILED", "FINISHED"): print("status is: {}".format(status)) break return query_id def status_check(client, query_id): desc = client.describe_statement(Id=query_id) status = desc["Status"] if status == "FAILED": raise Exception('SQL query failed:' + query_id + ": " + desc["Error"]) return status.strip('"') def notify(sns_topic_arn, subject, body): subject = ("".join(ch for ch in subject if unicodedata.category(ch)[0] != "C"))[0:99] body = str(body) sns_client = boto3.client('sns') response = sns_client.publish( TargetArn=sns_topic_arn, Message=json.dumps({'default': json.dumps("{}"), 'sms': subject, 'email': body}), Subject=subject, MessageStructure='json' ) return "message sent" RedshiftS3AccessForEventAppBucket: Type: 'AWS::IAM::ManagedPolicy' Properties: PolicyDocument: Version: '2012-10-17' Statement: - Sid: AllowAllUsersToGetS3Objects Effect: Allow Action: - 's3:GetObject' Resource: 'arn:aws:s3:::event-driven-app-with-lambda-redshift/*' - Sid: AllowAllUsersToListS3Bucket Effect: Allow Action: - 's3:ListBucket' Resource: 'arn:aws:s3:::event-driven-app-with-lambda-redshift' Roles: [!Ref RedshiftIAMRoleName] SetupRedshiftObjectsLambdaRole: DependsOn: LambdaRedshiftDataApiETL Type: AWS::IAM::Role Properties: Description : IAM Role for lambda to invoke LambdaRedshiftDataApiETL lambda function AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: LambdaInvokePolicy PolicyDocument : Version: 2012-10-17 Statement: - Effect: "Allow" Action: - lambda:InvokeFunction - lambda:InvokeAsync Resource: !GetAtt [LambdaRedshiftDataApiETL, Arn] - Effect: "Allow" Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: "*" LambdaSetupRedshiftObjects: Type: "AWS::Lambda::Function" DependsOn: SetupRedshiftObjectsLambdaRole Properties: Description: Lambda function to provision Redshift objects by invoking LambdaRedshiftDataApiETL lambda function as part of the CloudFormation initiation. This is first lambda function, initial setup. Handler: index.handler Role: !GetAtt 'SetupRedshiftObjectsLambdaRole.Arn' Runtime: python3.7 Timeout: 30 Code: ZipFile: | import json import boto3 import cfnresponse import logging logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def handler(event, context): logger.info(json.dumps(event)) lambda_arn = event['ResourceProperties']['lambda_arn'] redshift_cluster_id = event['ResourceProperties']['redshift_cluster_id'] redshift_database = event['ResourceProperties']['redshift_database'] redshift_user = event['ResourceProperties']['redshift_user'] redshift_cluster_iam_role = event['ResourceProperties']['redshift_cluster_iam_role'] sns_topic_arn = event['ResourceProperties']['sns_topic_arn'] if event['RequestType'] == 'Delete': sql_text = ''' DROP PROCEDURE run_elt_process(); DROP MATERIALIZED VIEW IF EXISTS nyc_yellow_taxi_volume_analysis; DROP TABLE IF EXISTS nyc_yellow_taxi; ''' response = invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn) logger.info(response) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Delete complete'}) else: sql_text = ''' CREATE TABLE IF NOT EXISTS nyc_yellow_taxi (pickup_date DATE , pickup_datetime TIMESTAMP , dropoff_datetime TIMESTAMP , ratecode SMALLINT , passenger_count SMALLINT , trip_distance FLOAT4 , fare_amount FLOAT4 , total_amount FLOAT4 , payment_type SMALLINT , vendorid VARCHAR(20)) SORTKEY(pickup_date); DROP MATERIALIZED VIEW IF EXISTS nyc_yellow_taxi_volume_analysis; CREATE MATERIALIZED VIEW nyc_yellow_taxi_volume_analysis AS SELECT DATE_TRUNC('mon',pickup_date) pickup_month , ROUND(AVG(trip_distance),2) avg_distance , ROUND(AVG(fare_amount),2) avg_fare , COUNT(1) total_trips , SUM(trip_distance) total_distance_per_month , SUM(fare_amount) total_fare_per_month FROM nyc_yellow_taxi GROUP BY 1; CREATE OR REPLACE PROCEDURE run_elt_process() AS $$ BEGIN truncate table nyc_yellow_taxi; COPY nyc_yellow_taxi FROM 's3://event-driven-app-with-lambda-redshift/nyc_yellow_taxi_raw/' IAM_ROLE '{}' region 'us-west-2' delimiter '|'; REFRESH MATERIALIZED VIEW nyc_yellow_taxi_volume_analysis; END; $$ LANGUAGE plpgsql; ''' sql_text = sql_text.format(redshift_cluster_iam_role) response = invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn) logger.info(response) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Create complete'}) def invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn): client = boto3.client('lambda') lambda_payload = { "Input": { "redshift_cluster_id": redshift_cluster_id, "redshift_database": redshift_database, "redshift_user": redshift_user, "action": "run_sql", "sql_text": sql_text, "sns_topic_arn": sns_topic_arn } } response = client.invoke( FunctionName=lambda_arn, InvocationType='Event', Payload=json.dumps(lambda_payload) ) return response LambdaRedshiftDataApiUnload: DependsOn: - LambdaRedshiftDataApiETLRole Type: AWS::Lambda::Function Properties: Description: Lambda function to asynchronous unload call with Amazon Redshift Data API and publish custom notifications through SNS. 3rd Lambda function. Handler: index.handler Runtime: python3.7 Role: !GetAtt 'LambdaRedshiftDataApiETLRole.Arn' Timeout: 30 Code: ZipFile: | import json import time import unicodedata import traceback import sys from pip._internal import main # install latest version of boto3 main(['install', '-I', '-q', 'boto3', '--target', '/tmp/', '--no-cache-dir', '--disable-pip-version-check']) sys.path.insert(0,'/tmp/') import boto3 # initiate redshift-data client in boto3 client = boto3.client("redshift-data") def handler(event, context): print(event) # input parameters passed from the caller event # cluster identifier for the Amazon Redshift cluster redshift_cluster_id = event['Input'].get('redshift_cluster_id') # database name for the Amazon Redshift cluster redshift_database = event['Input'].get('redshift_database') # database user in the Amazon Redshift cluster with access to run relevant SQL queries redshift_user = event['Input'].get('redshift_user') # Amazon SNS topic to be used to publish notifications to end-users sns_topic_arn = event['Input'].get('sns_topic_arn') # action to be taken by the lambda function. Allowed values: [run_sql, status_check, notify] action = event['Input'].get('action') # sql text to be run. e.g. call my_stored_procedure(input_params) sql_text = event['Input'].get('sql_text') # subject line to be used while publishing message through Amazon SNS subject = event['Input'].get('subject') # detailed body to be used while publishing message through Amazon SNS body = event['Input'].get('body') # query_id to input for action status_check query_id = event['Input'].get('query_id') try: if action == 'run_sql': #s3_bucket to unload redshift data s3_bucket=event['Input'].get('s3bucketname') redshift_iam_role = event['Input'].get('redshift_iam_role') # run the input SQL statement in the specified Amazon Redshift cluster # sql text to be run. e.g. call my_stored_procedure(input_params) sql_text = "unload ('select * from nyc_yellow_taxi_volume_analysis where pickup_month=''2015-12-01 00:00:00'' ')"+ \ " to '" + s3_bucket + "'" + " iam_role '" + redshift_iam_role + "' FORMAT PARQUET "+ \ " ALLOWOVERWRITE PARTITION BY (PICKUP_MONTH); " res = run_sql(client, sql_text, redshift_database, redshift_user, redshift_cluster_id) elif action == "status_check": # check status of a previously run query res = status_check(client, query_id) elif action == "notify": # notify to end-users by publishing message in Amazon SNS service res = notify(sns_topic_arn, subject, body) else: raise ValueError("Invalid Action: " + action) except Exception as e: subject = "Error:" + action + ":" + str(e) body = traceback.format_exc() notify(sns_topic_arn, subject, body) raise return {'statusCode': 200, 'body': json.dumps(res)} def run_sql(client, sql_text, redshift_database, redshift_user, redshift_cluster_id, with_event=True): MAX_WAIT_CYCLES = 20 attempts = 0 print("Executing: {}".format(sql_text)) res = client.execute_statement(Database=redshift_database, DbUser=redshift_user, Sql=sql_text, ClusterIdentifier=redshift_cluster_id, WithEvent=with_event) print(res) query_id = res["Id"] while attempts < MAX_WAIT_CYCLES: attempts += 1 time.sleep(1) status = status_check(client, query_id) if status in ("STARTED", "FAILED", "FINISHED"): print("status is: {}".format(status)) break return query_id def status_check(client, query_id): desc = client.describe_statement(Id=query_id) status = desc["Status"] if status == "FAILED": raise Exception('SQL query failed:' + query_id + ": " + desc["Error"]) return status.strip('"') def notify(sns_topic_arn, subject, body): subject = ("".join(ch for ch in subject if unicodedata.category(ch)[0] != "C"))[0:99] body = str(body) sns_client = boto3.client('sns') response = sns_client.publish( TargetArn=sns_topic_arn, Message=json.dumps({'default': json.dumps("{}"), 'sms': subject, 'email': body}), Subject=subject, MessageStructure='json' ) return "message sent" SetupRedshiftObjects: Type: Custom::SetupRedshiftObjects DependsOn: - LambdaSetupRedshiftObjects Properties: ServiceToken: !GetAtt [LambdaSetupRedshiftObjects, Arn] lambda_arn: !GetAtt [LambdaRedshiftDataApiETL, Arn] redshift_cluster_id: !Ref RedshiftClusterIdentifier redshift_database: !Ref DatabaseName redshift_user: !Ref DbUsername redshift_cluster_iam_role: !Sub arn:aws:iam::${AWS::AccountId}:role/${RedshiftIAMRoleName} sns_topic_arn: !Ref RedshiftNotificationTopicSNS