Description: SQL Query Governance for Amazon Aurora and AWS RDS
Parameters:
  SubscriptionEmail:
    Default: user@example.com
    Type: String

Resources:
  SNSTopic:
        Type: "AWS::SNS::Topic"
        Properties:
            DisplayName: "SnsQueryGovernance"
            TopicName: "SnsQueryGovernance"
            KmsMasterKeyId: "alias/aws/sns"

  SNSSubscription:
        Type: "AWS::SNS::Subscription"
        Properties:
            TopicArn: !Ref SNSTopic
            Endpoint: !Ref SubscriptionEmail
            Protocol: "email"
            Region: !Ref AWS::Region
            

  IAMPolicy:
        Type: "AWS::IAM::Policy"
        Properties:
            PolicyDocument: !Sub |
                {
                    "Version": "2012-10-17",
                    "Statement": [
                        {
                            "Effect": "Allow",
                            "Action": "logs:CreateLogGroup",
                            "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*"
                        },
                        {
                            "Effect": "Allow",
                            "Action": [
                                "logs:CreateLogStream",
                                "logs:PutLogEvents"
                            ],
                            "Resource": [
                                "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionMysql}:*",
                                "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionPostgresql}:*"
                            ]
                        },
                        {
                            "Sid": "DynamoDB",
                            "Effect": "Allow",
                            "Action": "dynamodb:PutItem",
                            "Resource": "arn:aws:dynamodb:*:${AWS::AccountId}:table/${DynamoDBTable}"
                        },
                        {
                            "Sid": "SNS",
                            "Effect": "Allow",
                            "Action": "sns:Publish",
                            "Resource": "${SNSTopic}"
                        }
                    ]
                }
            Roles: 
              - !Ref IAMRole
            PolicyName: "iam_policy_query_governance"


  IAMRole:
        Type: "AWS::IAM::Role"
        Properties:
            Path: "/"
            RoleName: "IAMRoleQueryGovernance"
            AssumeRolePolicyDocument: "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"lambda.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}"
            MaxSessionDuration: 3600
            Description: "IAM Role for Query Governance"

  DynamoDBTable:
        Type: "AWS::DynamoDB::Table"
        Properties:
            AttributeDefinitions: 
              - 
                AttributeName: "id"
                AttributeType: "S"
              - 
                AttributeName: "instance"
                AttributeType: "S"
            BillingMode: "PAY_PER_REQUEST"
            TableName: "DynamoDBQueryGovernanceTable"
            PointInTimeRecoverySpecification :
                PointInTimeRecoveryEnabled: "true"
            SSESpecification:
                SSEEnabled: "true"
            KeySchema: 
              - 
                AttributeName: "instance"
                KeyType: "HASH"
              - 
                AttributeName: "id"
                KeyType: "RANGE"

  
  LambdaFunctionMysql:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: LambdaQueryGovernanceMysql
      Role: !GetAtt IAMRole.Arn
      Runtime: python3.9
      Handler: index.lambda_handler
      Timeout: 30
      ReservedConcurrentExecutions: 5
      Environment: 
                Variables: 
                    AWS_DYNAMODB_TABLE: !Ref DynamoDBTable
                    AWS_SNS_TOPIC: !Ref SNSTopic
      Code:
        ZipFile: |
          import gzip
          import json
          import base64
          import boto3
          import os

          sns_topic=os.environ['AWS_SNS_TOPIC']  
          dynamodb_table=os.environ['AWS_DYNAMODB_TABLE'] 
          client_sns = boto3.client("sns")
          client_dynamodb = boto3.client('dynamodb')

          def lambda_handler(event, context):
              cw_data = event['awslogs']['data']
              compressed_payload = base64.b64decode(cw_data)
              uncompressed_payload = gzip.decompress(compressed_payload)
              payload = json.loads(uncompressed_payload)
              log_group=payload['logGroup'].split("/")
              rds_instance=log_group[4]
              log_events = payload['logEvents']
              for log_event in log_events:
                  message_raw=log_event['message']
                  message_lines=message_raw.splitlines()
                  timestamp=message_lines[0].split(" ")
                  userid=message_lines[1].split(" ")
                  duration=message_lines[2].split(" ")
                  sql_statement=' '.join(map(str, message_lines[3:])) 
                  payload_log_event={"id": log_event['id'], "type": "mysql", "instance":rds_instance, "timestamp":timestamp[2], "duration": duration[2] , "host": userid[5], "userid": userid[2],  "statement" : sql_statement}
                  print (payload_log_event)
                  send_notification(payload_log_event)
                  save_event(payload_log_event)
                  

          def send_notification(payload):
              client_sns.publish(
                              TopicArn=sns_topic, 
                              Message=json.dumps({'default': json.dumps(payload)}),
                              MessageStructure = 'json',
                              Subject=f'Alert - SQL Query Governance : {payload["instance"]} - {payload["timestamp"]}'
                              )
                  

          def save_event(payload):
              client_dynamodb.put_item(
                                      TableName=dynamodb_table,
                                      Item={
                                                  'instance': {
                                                    'S': payload["instance"]
                                                  },
                                                  'id': {
                                                    'S': payload["id"]
                                                  },
                                                  'type': {
                                                    'S': payload["type"]
                                                  },
                                                  'duration': {
                                                    'N': payload["duration"]
                                                  },
                                                  'timestamp': {
                                                    'S': payload["timestamp"]
                                                  },
                                                  'userid': {
                                                    'S': payload["userid"]
                                                  },
                                                  'host': {
                                                    'S': payload["host"]
                                                  },
                                                  'statement': {
                                                    'S': payload["statement"]
                                                  }
                                                  
                                              }
                                      )



  LambdaFunctionPostgresql:
    Type: AWS::Lambda::Function
    Properties:
      FunctionName: LambdaQueryGovernancePostgresql
      Role: !GetAtt IAMRole.Arn
      Runtime: python3.9
      Handler: index.lambda_handler
      ReservedConcurrentExecutions: 5
      Timeout: 30
      Environment: 
                Variables: 
                    AWS_DYNAMODB_TABLE: !Ref DynamoDBTable
                    AWS_SNS_TOPIC: !Ref SNSTopic
      Code:
        ZipFile: |
          import gzip
          import json
          import base64
          import boto3
          import os

          sns_topic=os.environ['AWS_SNS_TOPIC']  
          dynamodb_table=os.environ['AWS_DYNAMODB_TABLE'] 
          client_sns = boto3.client("sns")
          client_dynamodb = boto3.client('dynamodb')

          def lambda_handler(event, context):
              cw_data = event['awslogs']['data']
              compressed_payload = base64.b64decode(cw_data)
              uncompressed_payload = gzip.decompress(compressed_payload)
              payload = json.loads(uncompressed_payload)
              log_group=payload['logGroup'].split("/")
              rds_instance=log_group[4]
              log_events = payload['logEvents']
              for log_event in log_events:
                  message_raw=log_event['message']
                  sql_statement=message_raw[int(message_raw.index("statement:"))+11:]
                  message=message_raw.split(" ")
                  session_info=message[2].split(":")
                  payload_log_event={"id": log_event['id'], "type": "postgresql", "instance":rds_instance, "timestamp": message[0] + " " + message[1], "duration": message[5] , "host": session_info[1], "userid": session_info[2],  "statement" : sql_statement}
                  print (payload_log_event)
                  send_notification(payload_log_event)
                  save_event(payload_log_event)
                  

          def send_notification(payload):
              client_sns.publish(
                                TopicArn=sns_topic, 
                                Message=json.dumps({'default': json.dumps(payload)}),
                                MessageStructure = 'json',
                                Subject=f'Alert - SQL Query Governance : {payload["instance"]} - {payload["timestamp"]}'
                                )
                  

          def save_event(payload):
              client_dynamodb.put_item(
                                      TableName=dynamodb_table,
                                      Item={
                                                  'instance': {
                                                    'S': payload["instance"]
                                                  },
                                                  'id': {
                                                    'S': payload["id"]
                                                  },
                                                  'type': {
                                                    'S': payload["type"]
                                                  },
                                                  'duration': {
                                                    'N': payload["duration"]
                                                  },
                                                  'timestamp': {
                                                    'S': payload["timestamp"]
                                                  },
                                                  'userid': {
                                                    'S': payload["userid"]
                                                  },
                                                  'host': {
                                                    'S': payload["host"]
                                                  },
                                                  'statement': {
                                                    'S': payload["statement"]
                                                  }
                                                  
                                              }
                                      )