AWSTemplateFormatVersion: 2010-09-09 Description: Pinpoint Event Processing Parameters: PinpointProjectId: Type: String Description: Amazon Pinpoint Project ID if one already exists, blank to create one PinpointProjectName: Type: String Default: "My Pinpoint Project" Description: "If no PinpointProjectId provided, name to be used to create the Pinpoint project" PinpointEventS3BucketName: Type: String Description: Name of the S3 bucket to create to contain the Pinpoint events - must be unique! Conditions: NeedsPinpointProjectId: !Equals - '' - !Ref PinpointProjectId Resources: PinpointApplication: Type: AWS::Pinpoint::App Condition: NeedsPinpointProjectId Properties: Name: !Ref PinpointProjectName DataS3Bucket: Type: AWS::S3::Bucket DeletionPolicy: Retain Metadata: cfn_nag: rules_to_suppress: - id: W51 reason: Not public facing. Properties: BucketName: !Ref PinpointEventS3BucketName PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 VersioningConfiguration: Status: Enabled LoggingConfiguration: DestinationBucketName: !Ref LogBucket LogFilePrefix: pinpoint-event-processing/ NotificationConfiguration: LambdaConfigurations: - Event: "s3:ObjectCreated:*" Function: !GetAtt EventProcessingLambda.Arn LogBucket: Type: AWS::S3::Bucket DeletionPolicy: Retain Metadata: cfn_nag: rules_to_suppress: - id: W35 reason: This is the log bucket. Properties: AccessControl: LogDeliveryWrite PublicAccessBlockConfiguration: BlockPublicAcls: True BlockPublicPolicy: True IgnorePublicAcls: True RestrictPublicBuckets: True BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 VersioningConfiguration: Status: Enabled LogBucketPolicy: Type: AWS::S3::BucketPolicy Properties: Bucket: !Ref LogBucket PolicyDocument: Version: 2012-10-17 Statement: - Sid: AWSCloudTrailAclCheck Effect: Allow Principal: Service: cloudtrail.amazonaws.com Action: "s3:GetBucketAcl" Resource: !Sub arn:aws:s3:::${LogBucket} - Sid: AWSCloudTrailWrite Effect: Allow Principal: Service: cloudtrail.amazonaws.com Action: "s3:PutObject" Resource: !Sub arn:aws:s3:::${LogBucket}/AWSLogs/${AWS::AccountId}/* Condition: StringEquals: "s3:x-amz-acl": "bucket-owner-full-control" PinpointEventStream: Type: AWS::Pinpoint::EventStream Properties: ApplicationId: !If - NeedsPinpointProjectId - !Ref PinpointApplication - !Ref PinpointProjectId DestinationStreamArn: !GetAtt PinpointEventFirehose.Arn RoleArn: !GetAtt PinpointKinesisStreamRole.Arn PinpointEventFirehose: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamType: "DirectPut" ExtendedS3DestinationConfiguration: BucketARN: !Sub "arn:aws:s3:::${DataS3Bucket}" BufferingHints: IntervalInSeconds: 300 SizeInMBs: 5 CompressionFormat: "UNCOMPRESSED" Prefix: "events/" RoleARN: !GetAtt PinpointKinesisFirehoseRole.Arn PinpointKinesisFirehoseRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: - firehose.amazonaws.com Action: - "sts:AssumeRole" Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "s3:AbortMultipartUpload" - "s3:GetBucketLocation" - "s3:GetObject" - "s3:ListBucket" - "s3:ListBucketMultipartUploads" - "s3:PutObject" Resource: - !Sub "arn:aws:s3:::${PinpointEventS3BucketName}*" - !Sub "arn:aws:s3:::${PinpointEventS3BucketName}" - Effect: "Allow" Action: "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:log-group:/aws/kinesisfirehose/*:log-stream:*" PinpointKinesisStreamRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - pinpoint.amazonaws.com Action: - "sts:AssumeRole" Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "firehose:PutRecordBatch" - "firehose:DescribeDeliveryStream" Resource: !GetAtt PinpointEventFirehose.Arn EventProcessingLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt EventProcessingLambdaRole.Arn Runtime: "python3.7" Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" Code: ZipFile: | import boto3 import logging import os import json from botocore.exceptions import ClientError sesv2 = boto3.client('sesv2') s3 = boto3.resource('s3') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) # Loop over all added files from S3 for record in event['Records']: bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] obj = s3.Object(bucket, key) # Loop over each SES event in the S3 firehose file for line in obj.get()['Body']._raw_stream: try: pinpoint_event = json.loads(line) logging.info(pinpoint_event) # If event is a Bounce, handle it if (pinpoint_event['event_type'] == '_email.hardbounce' and pinpoint_event['facets']['email_channel']['mail_event']['bounce']['bounce_type'] == 'Permanent' ): for recipient in pinpoint_event['facets']['email_channel']['mail_event']['bounce']['bounced_recipients']: logging.info('Found Hard Bounce for email address: %s', recipient['email_address']) sesv2.put_suppressed_destination( EmailAddress = recipient['email_address'], Reason = 'BOUNCE' ) # CUSTOMER TODO - Update customer system with Bounce status # If event is a spam complaint elif (pinpoint_event['event_type'] == '_email.complaint'): for recipient in pinpoint_event['facets']['email_channel']['mail_event']['complaint']['complained_recipients']: logging.info('Found Complaint for email address: %s', recipient['email_address']) sesv2.put_suppressed_destination( EmailAddress = recipient['email_address'], Reason = 'COMPLAINT' ) # CUSTOMER TODO - Update customer system with Complaint status except Exception as e: logging.error('Received Error while processing s3 file: %s', e) EventProcessingLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - "sts:AssumeRole" Path: "/" EventProcessingLambdaRolePolicy: Type: AWS::IAM::Policy Properties: Roles: - !Ref EventProcessingLambdaRole PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "ses:PutSuppressedDestination" Resource: "*" - Effect: "Allow" Action: - "s3:GetBucketLocation" - "s3:GetObject" - "s3:ListBucket" Resource: - !Sub arn:aws:s3:::${PinpointEventS3BucketName} - !Sub arn:aws:s3:::${PinpointEventS3BucketName}/* - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" S3InvokeLambdaPermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref EventProcessingLambda Principal: s3.amazonaws.com SourceArn: !Sub arn:aws:s3:::${PinpointEventS3BucketName} SourceAccount: !Ref "AWS::AccountId"