AWSTemplateFormatVersion: "2010-09-09" Description: "CloudFormation Template to deploy an Event Driven Application with AWS Lambda and Amazon Redshift Data API" Parameters: RedshiftClusterIdentifier: Description: Cluster Identifier for your redshift cluster Type: String Default: 'redshift-cluster-1' DbUsername: Description: Redshift database user name which has access to run SQL Script. Type: String AllowedPattern: "([a-z])([a-z]|[0-9])*" Default: 'awsuser' DatabaseName: Description: Name of the Redshift database where SQL Script would be run. Type: String Default: 'dev' RedshiftIAMRoleName: Description: AWS IAM Role Name associated with the Redshift cluster Type: String Default: 'myRedshiftRole' NotificationEmailId: Type: String Description: EmailId to send event notifications through Amazon SNS AllowedPattern: '^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$' ConstraintDescription: provide a valid email address. Default: 'yourname@company.com' ExecutionSchedule: Type: String Description: Cron expression to schedule extract-load-transform (ELT) process through EventBridge rule Default: 'cron(0 11 ? * MON-FRI *)' SqlText: Type: String Description: SQL Text to be run as part of the extract-load-transform (ELT) process Default: 'call run_elt_process();' Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "Input Parameters" Parameters: - RedshiftClusterIdentifier - DbUsername - DatabaseName - RedshiftIAMRoleName - NotificationEmailId - ExecutionSchedule - SqlText Resources: RedshiftNotificationTopicSNS: Type: AWS::SNS::Topic Properties: KmsMasterKeyId: alias/aws/sns Subscription: - Endpoint: !Ref NotificationEmailId Protocol: email LambdaRedshiftDataApiETLRole: Type: AWS::IAM::Role Properties: Description : IAM Role for lambda to access Redshift and SNS topic ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: RedshiftAccessPolicy PolicyDocument : Version: 2012-10-17 Statement: - Effect: Allow Action: redshift:GetClusterCredentials Resource: - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:cluster:${RedshiftClusterIdentifier} - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbname:${RedshiftClusterIdentifier}/${DatabaseName} - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:${RedshiftClusterIdentifier}/${DbUsername} - Effect: "Allow" Action: - redshift-data:ExecuteStatement - redshift-data:ListStatements - redshift-data:GetStatementResult - redshift-data:DescribeStatement Resource: "*" - PolicyName: SNSPublishPolicy PolicyDocument : Version: 2012-10-17 Statement: - Effect: Allow Action: sns:Publish Resource: !Ref RedshiftNotificationTopicSNS EventBridgeScheduledEventRule: Type: "AWS::Events::Rule" Properties: Description: Scheduled Event Rule to be triggered periodically based on cron expression. ScheduleExpression: !Ref ExecutionSchedule State: "ENABLED" Targets: - Arn: Fn::GetAtt: - "LambdaRedshiftDataApiETL" - "Arn" Id: ScheduledEventRedshiftELT Input: !Sub "{\"Input\":{\"redshift_cluster_id\":\"${RedshiftClusterIdentifier}\",\"redshift_database\":\"${DatabaseName}\",\"redshift_user\":\"${DbUsername}\",\"action\":\"run_sql\",\"sql_text\":\"${SqlText}\",\"sns_topic_arn\":\"${RedshiftNotificationTopicSNS}\"}}" PermissionForScheduledEventToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "LambdaRedshiftDataApiETL" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "EventBridgeScheduledEventRule" - "Arn" EventBridgeRedshiftEventRule: Type: "AWS::Events::Rule" Description: Redshift Event Rule to automatically capture Redshift stored procedure completion event and trigger Lambda function to notify users Properties: EventPattern: !Sub "{\"source\": [\"aws.redshift-data\"],\"detail\": {\"principal\": [\"arn:aws:sts::${AWS::AccountId}:assumed-role/${LambdaRedshiftDataApiETLRole}/${LambdaRedshiftDataApiETL}\"]}}" Description: Respond to Redshift-data events State: "ENABLED" Targets: - Arn: !GetAtt 'LambdaRedshiftDataApiETL.Arn' Id: EventBridgeRedshiftEventRule InputTransformer: InputPathsMap: {"body":"$.detail"} InputTemplate: !Sub "{\"Input\":{\"redshift_cluster_id\":\"${RedshiftClusterIdentifier}\",\"redshift_database\":\"${DatabaseName}\",\"redshift_user\":\"${DbUsername}\",\"action\":\"notify\",\"subject\":\"Extract Load Transform process completed in Amazon Redshift\",\"body\":<body>,\"sns_topic_arn\":\"${RedshiftNotificationTopicSNS}\"}}" PermissionForRedshiftEventToInvokeLambda: Type: AWS::Lambda::Permission Properties: FunctionName: Ref: "LambdaRedshiftDataApiETL" Action: "lambda:InvokeFunction" Principal: "events.amazonaws.com" SourceArn: Fn::GetAtt: - "EventBridgeRedshiftEventRule" - "Arn" LambdaRedshiftDataApiETL: DependsOn: - LambdaRedshiftDataApiETLRole Type: AWS::Lambda::Function Properties: Description: Lambda function to asynchronous call stored procedure with Amazon Redshift Data API and publish custom notifications through SNS Handler: index.handler Runtime: python3.7 Role: !GetAtt 'LambdaRedshiftDataApiETLRole.Arn' Timeout: 30 Code: ZipFile: | import json import time import unicodedata import traceback import sys from pip._internal import main # install latest version of boto3 main(['install', '-I', '-q', 'boto3', '--target', '/tmp/', '--no-cache-dir', '--disable-pip-version-check']) sys.path.insert(0,'/tmp/') import boto3 # initiate redshift-data client in boto3 client = boto3.client("redshift-data") def handler(event, context): print(event) # input parameters passed from the caller event # cluster identifier for the Amazon Redshift cluster redshift_cluster_id = event['Input'].get('redshift_cluster_id') # database name for the Amazon Redshift cluster redshift_database = event['Input'].get('redshift_database') # database user in the Amazon Redshift cluster with access to run relevant SQL queries redshift_user = event['Input'].get('redshift_user') # Amazon SNS topic to be used to publish notifications to end-users sns_topic_arn = event['Input'].get('sns_topic_arn') # action to be taken by the lambda function. Allowed values: [run_sql, status_check, notify] action = event['Input'].get('action') # sql text to be run. e.g. call my_stored_procedure(input_params) sql_text = event['Input'].get('sql_text') # subject line to be used while publishing message through Amazon SNS subject = event['Input'].get('subject') # detailed body to be used while publishing message through Amazon SNS body = event['Input'].get('body') # query_id to input for action status_check query_id = event['Input'].get('query_id') try: if action == 'run_sql': # run the input SQL statement in the specified Amazon Redshift cluster res = run_sql(client, sql_text, redshift_database, redshift_user, redshift_cluster_id) elif action == "status_check": # check status of a previously run query res = status_check(client, query_id) elif action == "notify": # notify to end-users by publishing message in Amazon SNS service res = notify(sns_topic_arn, subject, body) else: raise ValueError("Invalid Action: " + action) except Exception as e: subject = "Error:" + action + ":" + str(e) body = traceback.format_exc() notify(sns_topic_arn, subject, body) raise return {'statusCode': 200, 'body': json.dumps(res)} def run_sql(client, sql_text, redshift_database, redshift_user, redshift_cluster_id, with_event=True): print("Executing: {}".format(sql_text)) res = client.execute_statement(Database=redshift_database, DbUser=redshift_user, Sql=sql_text, ClusterIdentifier=redshift_cluster_id, WithEvent=with_event) print(res) query_id = res["Id"] done = False while not done: time.sleep(1) status = status_check(client, query_id) if status in ("STARTED", "FAILED", "FINISHED"): print("status is: {}".format(status)) break return query_id def status_check(client, query_id): desc = client.describe_statement(Id=query_id) status = desc["Status"] if status == "FAILED": raise Exception('SQL query failed:' + query_id + ": " + desc["Error"]) return status.strip('"') def notify(sns_topic_arn, subject, body): subject = ("".join(ch for ch in subject if unicodedata.category(ch)[0] != "C"))[0:99] body = str(body) sns_client = boto3.client('sns') response = sns_client.publish( TargetArn=sns_topic_arn, Message=json.dumps({'default': json.dumps("{}"), 'sms': subject, 'email': body}), Subject=subject, MessageStructure='json' ) return "message sent" RedshiftS3AccessForEventAppBucket: Type: 'AWS::IAM::ManagedPolicy' Properties: PolicyDocument: Version: '2012-10-17' Statement: - Sid: AllowAllUsersToGetS3Objects Effect: Allow Action: - 's3:GetObject' Resource: 'arn:aws:s3:::event-driven-app-with-lambda-redshift/*' - Sid: AllowAllUsersToListS3Bucket Effect: Allow Action: - 's3:ListBucket' Resource: 'arn:aws:s3:::event-driven-app-with-lambda-redshift' Roles: [!Ref RedshiftIAMRoleName] SetupRedshiftObjectsLambdaRole: DependsOn: LambdaRedshiftDataApiETL Type: AWS::IAM::Role Properties: Description : IAM Role for lambda to invoke LambdaRedshiftDataApiETL lambda function AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: LambdaInvokePolicy PolicyDocument : Version: 2012-10-17 Statement: - Effect: "Allow" Action: - lambda:InvokeFunction - lambda:InvokeAsync Resource: !GetAtt [LambdaRedshiftDataApiETL, Arn] - Effect: "Allow" Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: "*" LambdaSetupRedshiftObjects: Type: "AWS::Lambda::Function" DependsOn: SetupRedshiftObjectsLambdaRole Properties: Description: Lambda function to provision Redshift objects by invoking LambdaRedshiftDataApiETL lambda function as part of the CloudFormation initiation Handler: index.handler Role: !GetAtt 'SetupRedshiftObjectsLambdaRole.Arn' Runtime: python3.7 Timeout: 30 Code: ZipFile: | import json import boto3 import cfnresponse import logging logging.basicConfig() logger = logging.getLogger(__name__) logger.setLevel(logging.INFO) def handler(event, context): logger.info(json.dumps(event)) lambda_arn = event['ResourceProperties']['lambda_arn'] redshift_cluster_id = event['ResourceProperties']['redshift_cluster_id'] redshift_database = event['ResourceProperties']['redshift_database'] redshift_user = event['ResourceProperties']['redshift_user'] redshift_cluster_iam_role = event['ResourceProperties']['redshift_cluster_iam_role'] sns_topic_arn = event['ResourceProperties']['sns_topic_arn'] if event['RequestType'] == 'Delete': sql_text = ''' DROP PROCEDURE run_elt_process(); DROP MATERIALIZED VIEW IF EXISTS nyc_yellow_taxi_volume_analysis; DROP TABLE IF EXISTS nyc_yellow_taxi; ''' response = invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn) logger.info(response) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Delete complete'}) else: sql_text = ''' CREATE TABLE IF NOT EXISTS nyc_yellow_taxi (pickup_date DATE , pickup_datetime TIMESTAMP , dropoff_datetime TIMESTAMP , ratecode SMALLINT , passenger_count SMALLINT , trip_distance FLOAT4 , fare_amount FLOAT4 , total_amount FLOAT4 , payment_type SMALLINT , vendorid VARCHAR(20)) SORTKEY(pickup_date); DROP MATERIALIZED VIEW IF EXISTS nyc_yellow_taxi_volume_analysis; CREATE MATERIALIZED VIEW nyc_yellow_taxi_volume_analysis AS SELECT DATE_TRUNC('mon',pickup_date) pickup_month , ROUND(AVG(trip_distance),2) avg_distance , ROUND(AVG(fare_amount),2) avg_fare , COUNT(1) total_trips , SUM(trip_distance) total_distance_per_month , SUM(fare_amount) total_fare_per_month FROM nyc_yellow_taxi GROUP BY 1; CREATE OR REPLACE PROCEDURE run_elt_process() AS $$ BEGIN truncate table nyc_yellow_taxi; COPY nyc_yellow_taxi FROM 's3://event-driven-app-with-lambda-redshift/nyc_yellow_taxi_raw/' IAM_ROLE '{}' region 'us-west-2' delimiter '|'; REFRESH MATERIALIZED VIEW nyc_yellow_taxi_volume_analysis; END; $$ LANGUAGE plpgsql; ''' sql_text = sql_text.format(redshift_cluster_iam_role) response = invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn) logger.info(response) cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Create complete'}) def invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn): client = boto3.client('lambda') lambda_payload = { "Input": { "redshift_cluster_id": redshift_cluster_id, "redshift_database": redshift_database, "redshift_user": redshift_user, "action": "run_sql", "sql_text": sql_text, "sns_topic_arn": sns_topic_arn } } response = client.invoke( FunctionName=lambda_arn, InvocationType='Event', Payload=json.dumps(lambda_payload) ) return response SetupRedshiftObjects: Type: Custom::SetupRedshiftObjects DependsOn: - LambdaSetupRedshiftObjects Properties: ServiceToken: !GetAtt [LambdaSetupRedshiftObjects, Arn] lambda_arn: !GetAtt [LambdaRedshiftDataApiETL, Arn] redshift_cluster_id: !Ref RedshiftClusterIdentifier redshift_database: !Ref DatabaseName redshift_user: !Ref DbUsername redshift_cluster_iam_role: !Sub arn:aws:iam::${AWS::AccountId}:role/${RedshiftIAMRoleName} sns_topic_arn: !Ref RedshiftNotificationTopicSNS