AWSTemplateFormatVersion: "2010-09-09"
Description: "CloudFormation Template to deploy an Event Driven Application with AWS Lambda and Amazon Redshift Data API"
Parameters:
  RedshiftClusterIdentifier:
    Description: Cluster Identifier for your redshift cluster
    Type: String
    Default: 'redshift-cluster-1'
  DbUsername:
    Description: Redshift database user name which has access to run SQL Script.
    Type: String
    AllowedPattern: "([a-z])([a-z]|[0-9])*"
    Default: 'awsuser'
  DatabaseName:
    Description: Name of the Redshift database where SQL Script would be run.
    Type: String
    Default: 'dev'
  RedshiftIAMRoleName:
    Description: AWS IAM Role Name associated with the Redshift cluster
    Type: String
    Default: 'myRedshiftRole'
  NotificationEmailId:
    Type: String
    Description: EmailId to send event notifications through Amazon SNS
    AllowedPattern: '^(([^<>()\[\]\\.,;:\s@"]+(\.[^<>()\[\]\\.,;:\s@"]+)*)|(".+"))@((\[[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}\.[0-9]{1,3}])|(([a-zA-Z\-0-9]+\.)+[a-zA-Z]{2,}))$'
    ConstraintDescription: provide a valid email address.
    Default: 'yourname@company.com'
  ExecutionSchedule:
    Type: String
    Description: Cron expression to schedule extract-load-transform (ELT) process through EventBridge rule
    Default: 'cron(0 11 ? * MON-FRI *)'
  SqlText:
    Type: String
    Description: SQL Text to be run as part of the extract-load-transform (ELT) process
    Default: 'call run_elt_process();'
Metadata:
  AWS::CloudFormation::Interface:
    ParameterGroups:
      -
        Label:
          default: "Input Parameters"
        Parameters:
        - RedshiftClusterIdentifier
        - DbUsername
        - DatabaseName
        - RedshiftIAMRoleName
        - NotificationEmailId
        - ExecutionSchedule
        - SqlText
Resources:
  RedshiftNotificationTopicSNS:
    Type: AWS::SNS::Topic
    Properties:
      KmsMasterKeyId: alias/aws/sns
      Subscription:
        - Endpoint: !Ref NotificationEmailId
          Protocol: email
  LambdaRedshiftDataApiETLRole:
    Type: AWS::IAM::Role
    Properties:
      Description : IAM Role for lambda to access Redshift and SNS topic
      ManagedPolicyArns:
        - arn:aws:iam::aws:policy/service-role/AWSLambdaVPCAccessExecutionRole
      AssumeRolePolicyDocument:
          Version: 2012-10-17
          Statement:
            -
              Effect: Allow
              Principal:
                Service:
                  - lambda.amazonaws.com
              Action:
                - sts:AssumeRole
      Path: /
      Policies:
          -
            PolicyName: RedshiftAccessPolicy
            PolicyDocument :
              Version: 2012-10-17
              Statement:
                -
                  Effect: Allow
                  Action: redshift:GetClusterCredentials
                  Resource:
                    - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:cluster:${RedshiftClusterIdentifier}
                    - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbname:${RedshiftClusterIdentifier}/${DatabaseName}
                    - !Sub arn:aws:redshift:${AWS::Region}:${AWS::AccountId}:dbuser:${RedshiftClusterIdentifier}/${DbUsername}
                -
                  Effect: "Allow"
                  Action:
                  - redshift-data:ExecuteStatement
                  - redshift-data:ListStatements
                  - redshift-data:GetStatementResult
                  - redshift-data:DescribeStatement
                  Resource: "*"
          -
            PolicyName: SNSPublishPolicy
            PolicyDocument :
              Version: 2012-10-17
              Statement:
                -
                  Effect: Allow
                  Action: sns:Publish
                  Resource: !Ref RedshiftNotificationTopicSNS
  EventBridgeScheduledEventRule:
     Type: "AWS::Events::Rule"
     Properties:
       Description: Scheduled Event Rule to be triggered periodically based on cron expression.
       ScheduleExpression: !Ref ExecutionSchedule
       State: "ENABLED"
       Targets:
          -
            Arn:
              Fn::GetAtt:
                - "LambdaRedshiftDataApiETL"
                - "Arn"
            Id: ScheduledEventRedshiftELT
            Input: !Sub "{\"Input\":{\"redshift_cluster_id\":\"${RedshiftClusterIdentifier}\",\"redshift_database\":\"${DatabaseName}\",\"redshift_user\":\"${DbUsername}\",\"action\":\"run_sql\",\"sql_text\":\"${SqlText}\",\"sns_topic_arn\":\"${RedshiftNotificationTopicSNS}\"}}"
  PermissionForScheduledEventToInvokeLambda:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName:
        Ref: "LambdaRedshiftDataApiETL"
      Action: "lambda:InvokeFunction"
      Principal: "events.amazonaws.com"
      SourceArn:
        Fn::GetAtt:
          - "EventBridgeScheduledEventRule"
          - "Arn"
  EventBridgeRedshiftEventRule:
     Type: "AWS::Events::Rule"
     Description: Redshift Event Rule to automatically capture Redshift stored procedure completion event and trigger Lambda function to notify users
     Properties:
       EventPattern: !Sub "{\"source\": [\"aws.redshift-data\"],\"detail\": {\"principal\": [\"arn:aws:sts::${AWS::AccountId}:assumed-role/${LambdaRedshiftDataApiETLRole}/${LambdaRedshiftDataApiETL}\"]}}"
       Description: Respond to Redshift-data events
       State: "ENABLED"
       Targets:
          -
            Arn: !GetAtt 'LambdaRedshiftDataApiETL.Arn'
            Id: EventBridgeRedshiftEventRule
            InputTransformer:
                InputPathsMap: {"body":"$.detail"}
                InputTemplate: !Sub "{\"Input\":{\"redshift_cluster_id\":\"${RedshiftClusterIdentifier}\",\"redshift_database\":\"${DatabaseName}\",\"redshift_user\":\"${DbUsername}\",\"action\":\"notify\",\"subject\":\"Extract Load Transform process completed in Amazon Redshift\",\"body\":<body>,\"sns_topic_arn\":\"${RedshiftNotificationTopicSNS}\"}}"
  PermissionForRedshiftEventToInvokeLambda:
    Type: AWS::Lambda::Permission
    Properties:
      FunctionName:
        Ref: "LambdaRedshiftDataApiETL"
      Action: "lambda:InvokeFunction"
      Principal: "events.amazonaws.com"
      SourceArn:
        Fn::GetAtt:
          - "EventBridgeRedshiftEventRule"
          - "Arn"
  LambdaRedshiftDataApiETL:
    DependsOn:
      - LambdaRedshiftDataApiETLRole
    Type: AWS::Lambda::Function
    Properties:
      Description: Lambda function to asynchronous call stored procedure with Amazon Redshift Data API and publish custom notifications through SNS
      Handler: index.handler
      Runtime: python3.7
      Role: !GetAtt 'LambdaRedshiftDataApiETLRole.Arn'
      Timeout: 30
      Code:
        ZipFile: |
          import json
          import time
          import unicodedata
          import traceback
          import sys
          from pip._internal import main

          # install latest version of boto3
          main(['install', '-I', '-q', 'boto3', '--target', '/tmp/', '--no-cache-dir', '--disable-pip-version-check'])
          sys.path.insert(0,'/tmp/')
          import boto3

          # initiate redshift-data client in boto3
          client = boto3.client("redshift-data")

          def handler(event, context):
              print(event)

              # input parameters passed from the caller event
              # cluster identifier for the Amazon Redshift cluster
              redshift_cluster_id = event['Input'].get('redshift_cluster_id')
              # database name for the Amazon Redshift cluster
              redshift_database = event['Input'].get('redshift_database')
              # database user in the Amazon Redshift cluster with access to run relevant SQL queries
              redshift_user = event['Input'].get('redshift_user')
              # Amazon SNS topic to be used to publish notifications to end-users
              sns_topic_arn = event['Input'].get('sns_topic_arn')
              # action to be taken by the lambda function. Allowed values: [run_sql, status_check, notify]
              action = event['Input'].get('action')
              # sql text to be run. e.g. call my_stored_procedure(input_params)
              sql_text = event['Input'].get('sql_text')
              # subject line to be used while publishing message through Amazon SNS
              subject = event['Input'].get('subject')
              # detailed body to be used while publishing message through Amazon SNS
              body = event['Input'].get('body')
              # query_id to input for action status_check
              query_id = event['Input'].get('query_id')

              try:
                  if action == 'run_sql':
                      # run the input SQL statement in the specified Amazon Redshift cluster
                      res = run_sql(client, sql_text, redshift_database, redshift_user, redshift_cluster_id)
                  elif action == "status_check":
                      # check status of a previously run query
                      res = status_check(client, query_id)
                  elif action == "notify":
                      # notify to end-users by publishing message in Amazon SNS service
                      res = notify(sns_topic_arn, subject, body)
                  else:
                      raise ValueError("Invalid Action: " + action)
              except Exception as e:
                  subject = "Error:" + action + ":" + str(e)
                  body = traceback.format_exc()
                  notify(sns_topic_arn, subject, body)
                  raise

              return {'statusCode': 200, 'body': json.dumps(res)}

          def run_sql(client, sql_text, redshift_database, redshift_user, redshift_cluster_id, with_event=True):
              print("Executing: {}".format(sql_text))

              res = client.execute_statement(Database=redshift_database, DbUser=redshift_user, Sql=sql_text,
                                             ClusterIdentifier=redshift_cluster_id, WithEvent=with_event)
              print(res)
              query_id = res["Id"]
              done = False
              while not done:
                  time.sleep(1)
                  status = status_check(client, query_id)
                  if status in ("STARTED", "FAILED", "FINISHED"):
                      print("status is: {}".format(status))
                      break
              return query_id

          def status_check(client, query_id):
              desc = client.describe_statement(Id=query_id)
              status = desc["Status"]
              if status == "FAILED":
                  raise Exception('SQL query failed:' + query_id + ": " + desc["Error"])
              return status.strip('"')

          def notify(sns_topic_arn, subject, body):
              subject = ("".join(ch for ch in subject if unicodedata.category(ch)[0] != "C"))[0:99]
              body = str(body)
              sns_client = boto3.client('sns')
              response = sns_client.publish(
                  TargetArn=sns_topic_arn,
                  Message=json.dumps({'default': json.dumps("{}"),
                                      'sms': subject,
                                      'email': body}),
                  Subject=subject,
                  MessageStructure='json'
              )
              return "message sent"
  RedshiftS3AccessForEventAppBucket:
    Type: 'AWS::IAM::ManagedPolicy'
    Properties:
      PolicyDocument:
        Version: '2012-10-17'
        Statement:
          - Sid: AllowAllUsersToGetS3Objects
            Effect: Allow
            Action:
              - 's3:GetObject'
            Resource: 'arn:aws:s3:::event-driven-app-with-lambda-redshift/*'
          - Sid: AllowAllUsersToListS3Bucket
            Effect: Allow
            Action:
              - 's3:ListBucket'
            Resource: 'arn:aws:s3:::event-driven-app-with-lambda-redshift'
      Roles: [!Ref RedshiftIAMRoleName]
  SetupRedshiftObjectsLambdaRole:
    DependsOn: LambdaRedshiftDataApiETL
    Type: AWS::IAM::Role
    Properties:
      Description : IAM Role for lambda to invoke LambdaRedshiftDataApiETL lambda function
      AssumeRolePolicyDocument:
          Version: 2012-10-17
          Statement:
            -
              Effect: Allow
              Principal:
                Service:
                  - lambda.amazonaws.com
              Action:
                - sts:AssumeRole
      Path: /
      Policies:
          -
            PolicyName: LambdaInvokePolicy
            PolicyDocument :
              Version: 2012-10-17
              Statement:
                -
                  Effect: "Allow"
                  Action:
                  - lambda:InvokeFunction
                  - lambda:InvokeAsync
                  Resource: !GetAtt [LambdaRedshiftDataApiETL, Arn]
                -
                  Effect: "Allow"
                  Action:
                  - logs:CreateLogGroup
                  - logs:CreateLogStream
                  - logs:PutLogEvents
                  Resource: "*"
  LambdaSetupRedshiftObjects:
    Type: "AWS::Lambda::Function"
    DependsOn: SetupRedshiftObjectsLambdaRole
    Properties:
      Description: Lambda function to provision Redshift objects by invoking LambdaRedshiftDataApiETL lambda function as part of the CloudFormation initiation
      Handler: index.handler
      Role: !GetAtt 'SetupRedshiftObjectsLambdaRole.Arn'
      Runtime: python3.7
      Timeout: 30
      Code:
        ZipFile: |
          import json
          import boto3
          import cfnresponse
          import logging

          logging.basicConfig()
          logger = logging.getLogger(__name__)
          logger.setLevel(logging.INFO)

          def handler(event, context):
            logger.info(json.dumps(event))
            lambda_arn = event['ResourceProperties']['lambda_arn']
            redshift_cluster_id = event['ResourceProperties']['redshift_cluster_id']
            redshift_database = event['ResourceProperties']['redshift_database']
            redshift_user = event['ResourceProperties']['redshift_user']
            redshift_cluster_iam_role = event['ResourceProperties']['redshift_cluster_iam_role']
            sns_topic_arn = event['ResourceProperties']['sns_topic_arn']

            if event['RequestType'] == 'Delete':
              sql_text = '''
                      DROP PROCEDURE run_elt_process();
                      DROP MATERIALIZED VIEW IF EXISTS  nyc_yellow_taxi_volume_analysis;
                      DROP TABLE IF EXISTS nyc_yellow_taxi;
                      '''
              response = invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn)
              logger.info(response)
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Delete complete'})
            else:
              sql_text = '''
                      CREATE TABLE IF NOT EXISTS nyc_yellow_taxi
                      (pickup_date        DATE
                      , pickup_datetime  TIMESTAMP
                      , dropoff_datetime TIMESTAMP
                      , ratecode         SMALLINT
                      , passenger_count  SMALLINT
                      , trip_distance    FLOAT4
                      , fare_amount      FLOAT4
                      , total_amount     FLOAT4
                      , payment_type     SMALLINT
                      , vendorid         VARCHAR(20))
                      SORTKEY(pickup_date);

                      DROP MATERIALIZED VIEW IF EXISTS  nyc_yellow_taxi_volume_analysis;

                      CREATE MATERIALIZED VIEW nyc_yellow_taxi_volume_analysis
                      AS
                      SELECT
                        DATE_TRUNC('mon',pickup_date) pickup_month
                      , ROUND(AVG(trip_distance),2) avg_distance
                      , ROUND(AVG(fare_amount),2) avg_fare
                      , COUNT(1) total_trips
                      , SUM(trip_distance) total_distance_per_month
                      , SUM(fare_amount) total_fare_per_month
                      FROM nyc_yellow_taxi
                      GROUP BY 1;

                      CREATE OR REPLACE PROCEDURE run_elt_process()
                      AS $$
                      BEGIN
                        truncate table nyc_yellow_taxi;
                        COPY nyc_yellow_taxi FROM 's3://event-driven-app-with-lambda-redshift/nyc_yellow_taxi_raw/'
                        IAM_ROLE '{}'
                        region 'us-west-2' delimiter '|';
                        REFRESH MATERIALIZED VIEW nyc_yellow_taxi_volume_analysis;
                      END;
                      $$ LANGUAGE plpgsql;
                      '''
              sql_text = sql_text.format(redshift_cluster_iam_role)
              response = invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn)
              logger.info(response)
              cfnresponse.send(event, context, cfnresponse.SUCCESS, {'Data': 'Create complete'})

          def invoke_redshift_data_api_lambda(lambda_arn, redshift_cluster_id, redshift_database, redshift_user, sql_text, sns_topic_arn):
              client = boto3.client('lambda')

              lambda_payload = {
                "Input": {
                    "redshift_cluster_id": redshift_cluster_id,
                    "redshift_database": redshift_database,
                    "redshift_user": redshift_user,
                    "action": "run_sql",
                    "sql_text": sql_text,
                    "sns_topic_arn": sns_topic_arn
                }
              }
              response = client.invoke(
                  FunctionName=lambda_arn,
                  InvocationType='Event',
                  Payload=json.dumps(lambda_payload)
              )
              return response

  SetupRedshiftObjects:
    Type: Custom::SetupRedshiftObjects
    DependsOn:
      - LambdaSetupRedshiftObjects
    Properties:
      ServiceToken: !GetAtt [LambdaSetupRedshiftObjects, Arn]
      lambda_arn: !GetAtt [LambdaRedshiftDataApiETL, Arn]
      redshift_cluster_id: !Ref RedshiftClusterIdentifier
      redshift_database: !Ref DatabaseName
      redshift_user: !Ref DbUsername
      redshift_cluster_iam_role: !Sub arn:aws:iam::${AWS::AccountId}:role/${RedshiftIAMRoleName}
      sns_topic_arn: !Ref RedshiftNotificationTopicSNS