AWSTemplateFormatVersion: 2010-09-09 Description: SES Event Processing Parameters: OverwriteExistingSESNotifications: Default: false Type: String AllowedValues: [true, false] Description: Force overwrite existing SES Sending Identity Bounce and Complaint Notification SNS settings Resources: SESNotificationTopic: Type: AWS::SNS::Topic Properties: DisplayName: SESNotifications KmsMasterKeyId: alias/aws/sns SESNotificationQueue: Type: AWS::SQS::Queue Properties: VisibilityTimeout: 500 KmsMasterKeyId: alias/aws/sqs SnsSubscription: Type: AWS::SNS::Subscription Properties: Protocol: sqs Endpoint: !GetAtt SESNotificationQueue.Arn Region: !Ref AWS::Region TopicArn: !Ref SESNotificationTopic SESNotificationQueuePolicy: Type: AWS::SQS::QueuePolicy Properties: Queues: - !Ref SESNotificationQueue PolicyDocument: Id: "PublishSNStoSQS" Statement: - Effect: "Allow" Principal: "*" Action: - "sqs:SendMessage" Resource: "*" Condition: ArnEquals: aws:SourceArn: !Ref SESNotificationTopic LambdaFunctionEventSourceMapping: Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 10 Enabled: true EventSourceArn: !GetAtt SESNotificationQueue.Arn FunctionName: !GetAtt SESNotificationProcessingLambda.Arn SESNotificationProcessingLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt SESNotificationProcessingLambdaRole.Arn Runtime: "python3.7" Timeout: 300 Environment: Variables: LOG_LEVEL: "INFO" Code: ZipFile: | import boto3 import logging import os import json from botocore.exceptions import ClientError sesv2 = boto3.client("sesv2") def lambda_handler(event, context): global log_level log_level = str(os.environ.get("LOG_LEVEL")).upper() if log_level not in [ "DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL" ]: log_level = "ERROR" logging.getLogger().setLevel(log_level) logging.info(event) for record in event['Records']: try: ses_event = json.loads(json.loads(record['body'])['Message']) logging.info(ses_event) # If event is a Bounce, handle it if (ses_event["notificationType"] == "Bounce" and ses_event["bounce"]["bounceType"] == "Permanent" ): for recipient in ses_event["bounce"]["bouncedRecipients"]: sesv2.put_suppressed_destination( EmailAddress = recipient["emailAddress"], Reason = "BOUNCE" ) # CUSTOMER TODO - Update customer system with Bounce status # If event is a spam complaint elif (ses_event["notificationType"] == "Complaint"): for recipient in ses_event["complaint"]["complainedRecipients"]: sesv2.put_suppressed_destination( EmailAddress = recipient["emailAddress"], Reason = "COMPLAINT" ) # CUSTOMER TODO - Update customer system with Complaint status except Exception as e: logging.error("Received Error while processing SQS record: %s", e) SESNotificationProcessingLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - "sts:AssumeRole" Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "ses:PutSuppressedDestination" Resource: "*" - Effect: Allow Action: - sqs:ReceiveMessage - sqs:DeleteMessage - sqs:DeleteMessageBatch - sqs:GetQueueAttributes - sqs:ChangeMessageVisibility Resource: !GetAtt SESNotificationQueue.Arn - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" ### Deployment Helper - Update Sending Identities UpdateSendingIdenties: Type: Custom::LoadLambda Properties: ServiceToken: !GetAtt CustomResourceHelper.Arn CustomResourceAction: UpdateSendingIdenties CustomResourceHelper: Type: AWS::Lambda::Function Properties: Environment: Variables: SNS_TOPIC_ARN: !Ref SESNotificationTopic LOG_LEVEL: 'INFO' OVERWRITE_EXISTING: !Ref OverwriteExistingSESNotifications Description: Updates the current Sending Identities to write to topics. Handler: index.lambda_handler MemorySize: 256 Role: !GetAtt CustomResourceHelperRole.Arn Runtime: "python3.7" Timeout: 300 Code: ZipFile: | import cfnresponse import os import logging import traceback import boto3 import json ses = boto3.client('ses') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'DEBUG' logging.getLogger().setLevel(log_level) logging.info(event) topic = os.environ.get('SNS_TOPIC_ARN') overwrite_existing = os.environ.get('OVERWRITE_EXISTING') == 'true' if event['ResourceProperties']['CustomResourceAction'] == 'UpdateSendingIdenties': nextToken = '' try: while True: response = ses.list_identities( MaxItems=10, NextToken=nextToken ) logging.info(response) if 'NextToken' in response: nextToken = response['NextToken'] notification_attr_response = ses.get_identity_notification_attributes( Identities=response['Identities'] ) logging.info(notification_attr_response) for identity in notification_attr_response['NotificationAttributes']: if overwrite_existing or 'BounceTopic' not in notification_attr_response['NotificationAttributes'][identity] or notification_attr_response['NotificationAttributes'][identity]['BounceTopic'] == '': ses.set_identity_notification_topic( Identity=identity, NotificationType='Bounce', SnsTopic=topic ) logging.info('Bounce SNS Topic Added for: %s' % (identity)) else: logging.info('Found Existing Bounce SNS Topic Added for: %s, nothing done' % (identity)) if overwrite_existing or 'ComplaintTopic' not in notification_attr_response['NotificationAttributes'][identity] or notification_attr_response['NotificationAttributes'][identity]['ComplaintTopic'] == '': ses.set_identity_notification_topic( Identity=identity, NotificationType='Complaint', SnsTopic=topic ) logging.info('Complaint SNS Topic Added for: %s' % (identity)) else: logging.info('Found Existing Complaint SNS Topic Added for: %s, nothing done' % (identity)) if nextToken == '': break except Exception as error: logging.error('setPED error: %s' % (error)) logging.error('setPED trace: %s' % traceback.format_exc()) cfnresponse.send(event, context, cfnresponse.SUCCESS, {"success": True}, 'UpdateSendingIdenties') else: cfnresponse.send(event, context, cfnresponse.FAILED, {"success": False}, "error") CustomResourceHelperRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutDestination - logs:PutLogEvents Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "ses:ListIdentities" - "ses:GetIdentityNotificationAttributes" - "ses:SetIdentityNotificationTopic" Resource: "*"