Description: SQL Query Governance for Amazon Aurora and AWS RDS Parameters: SubscriptionEmail: Default: user@example.com Type: String Resources: SNSTopic: Type: "AWS::SNS::Topic" Properties: DisplayName: "SnsQueryGovernance" TopicName: "SnsQueryGovernance" KmsMasterKeyId: "alias/aws/sns" SNSSubscription: Type: "AWS::SNS::Subscription" Properties: TopicArn: !Ref SNSTopic Endpoint: !Ref SubscriptionEmail Protocol: "email" Region: !Ref AWS::Region IAMPolicy: Type: "AWS::IAM::Policy" Properties: PolicyDocument: !Sub | { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": "logs:CreateLogGroup", "Resource": "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" }, { "Effect": "Allow", "Action": [ "logs:CreateLogStream", "logs:PutLogEvents" ], "Resource": [ "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionMysql}:*", "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/lambda/${LambdaFunctionPostgresql}:*" ] }, { "Sid": "DynamoDB", "Effect": "Allow", "Action": "dynamodb:PutItem", "Resource": "arn:aws:dynamodb:*:${AWS::AccountId}:table/${DynamoDBTable}" }, { "Sid": "SNS", "Effect": "Allow", "Action": "sns:Publish", "Resource": "${SNSTopic}" } ] } Roles: - !Ref IAMRole PolicyName: "iam_policy_query_governance" IAMRole: Type: "AWS::IAM::Role" Properties: Path: "/" RoleName: "IAMRoleQueryGovernance" AssumeRolePolicyDocument: "{\"Version\":\"2012-10-17\",\"Statement\":[{\"Effect\":\"Allow\",\"Principal\":{\"Service\":\"lambda.amazonaws.com\"},\"Action\":\"sts:AssumeRole\"}]}" MaxSessionDuration: 3600 Description: "IAM Role for Query Governance" DynamoDBTable: Type: "AWS::DynamoDB::Table" Properties: AttributeDefinitions: - AttributeName: "id" AttributeType: "S" - AttributeName: "instance" AttributeType: "S" BillingMode: "PAY_PER_REQUEST" TableName: "DynamoDBQueryGovernanceTable" PointInTimeRecoverySpecification : PointInTimeRecoveryEnabled: "true" SSESpecification: SSEEnabled: "true" KeySchema: - AttributeName: "instance" KeyType: "HASH" - AttributeName: "id" KeyType: "RANGE" LambdaFunctionMysql: Type: AWS::Lambda::Function Properties: FunctionName: LambdaQueryGovernanceMysql Role: !GetAtt IAMRole.Arn Runtime: python3.9 Handler: index.lambda_handler Timeout: 30 ReservedConcurrentExecutions: 5 Environment: Variables: AWS_DYNAMODB_TABLE: !Ref DynamoDBTable AWS_SNS_TOPIC: !Ref SNSTopic Code: ZipFile: | import gzip import json import base64 import boto3 import os sns_topic=os.environ['AWS_SNS_TOPIC'] dynamodb_table=os.environ['AWS_DYNAMODB_TABLE'] client_sns = boto3.client("sns") client_dynamodb = boto3.client('dynamodb') def lambda_handler(event, context): cw_data = event['awslogs']['data'] compressed_payload = base64.b64decode(cw_data) uncompressed_payload = gzip.decompress(compressed_payload) payload = json.loads(uncompressed_payload) log_group=payload['logGroup'].split("/") rds_instance=log_group[4] log_events = payload['logEvents'] for log_event in log_events: message_raw=log_event['message'] message_lines=message_raw.splitlines() timestamp=message_lines[0].split(" ") userid=message_lines[1].split(" ") duration=message_lines[2].split(" ") sql_statement=' '.join(map(str, message_lines[3:])) payload_log_event={"id": log_event['id'], "type": "mysql", "instance":rds_instance, "timestamp":timestamp[2], "duration": duration[2] , "host": userid[5], "userid": userid[2], "statement" : sql_statement} print (payload_log_event) send_notification(payload_log_event) save_event(payload_log_event) def send_notification(payload): client_sns.publish( TopicArn=sns_topic, Message=json.dumps({'default': json.dumps(payload)}), MessageStructure = 'json', Subject=f'Alert - SQL Query Governance : {payload["instance"]} - {payload["timestamp"]}' ) def save_event(payload): client_dynamodb.put_item( TableName=dynamodb_table, Item={ 'instance': { 'S': payload["instance"] }, 'id': { 'S': payload["id"] }, 'type': { 'S': payload["type"] }, 'duration': { 'N': payload["duration"] }, 'timestamp': { 'S': payload["timestamp"] }, 'userid': { 'S': payload["userid"] }, 'host': { 'S': payload["host"] }, 'statement': { 'S': payload["statement"] } } ) LambdaFunctionPostgresql: Type: AWS::Lambda::Function Properties: FunctionName: LambdaQueryGovernancePostgresql Role: !GetAtt IAMRole.Arn Runtime: python3.9 Handler: index.lambda_handler ReservedConcurrentExecutions: 5 Timeout: 30 Environment: Variables: AWS_DYNAMODB_TABLE: !Ref DynamoDBTable AWS_SNS_TOPIC: !Ref SNSTopic Code: ZipFile: | import gzip import json import base64 import boto3 import os sns_topic=os.environ['AWS_SNS_TOPIC'] dynamodb_table=os.environ['AWS_DYNAMODB_TABLE'] client_sns = boto3.client("sns") client_dynamodb = boto3.client('dynamodb') def lambda_handler(event, context): cw_data = event['awslogs']['data'] compressed_payload = base64.b64decode(cw_data) uncompressed_payload = gzip.decompress(compressed_payload) payload = json.loads(uncompressed_payload) log_group=payload['logGroup'].split("/") rds_instance=log_group[4] log_events = payload['logEvents'] for log_event in log_events: message_raw=log_event['message'] sql_statement=message_raw[int(message_raw.index("statement:"))+11:] message=message_raw.split(" ") session_info=message[2].split(":") payload_log_event={"id": log_event['id'], "type": "postgresql", "instance":rds_instance, "timestamp": message[0] + " " + message[1], "duration": message[5] , "host": session_info[1], "userid": session_info[2], "statement" : sql_statement} print (payload_log_event) send_notification(payload_log_event) save_event(payload_log_event) def send_notification(payload): client_sns.publish( TopicArn=sns_topic, Message=json.dumps({'default': json.dumps(payload)}), MessageStructure = 'json', Subject=f'Alert - SQL Query Governance : {payload["instance"]} - {payload["timestamp"]}' ) def save_event(payload): client_dynamodb.put_item( TableName=dynamodb_table, Item={ 'instance': { 'S': payload["instance"] }, 'id': { 'S': payload["id"] }, 'type': { 'S': payload["type"] }, 'duration': { 'N': payload["duration"] }, 'timestamp': { 'S': payload["timestamp"] }, 'userid': { 'S': payload["userid"] }, 'host': { 'S': payload["host"] }, 'statement': { 'S': payload["statement"] } } )