AWSTemplateFormatVersion: "2010-09-09" Description: SMS chatbot multilingual campaign -- Kinesis Firehose deliver the Pinpoint stream to S3 that will trigger Lambda to check rolling window Parameters: OptoutAge: Type: Number Default: 15 Description: Number of days as the rolling window to opt out endpoints with age beyond this limit. optoutmessage: Type: String Default: You have completed SMS campaign Program. We will no longer be sending you scheduled messages. Thank you for your participation. Reply 'ENROLL' or 'JOIN' to enroll back to the program. Description: The message sent out when the phone number automatically opt out after a certain number of days (rolling window) Resources: PinpointStreamBucket: Type: AWS::S3::Bucket Properties: BucketName: PinpointStreamBucket BucketEncryption: ServerSideEncryptionConfiguration: - ServerSideEncryptionByDefault: SSEAlgorithm: AES256 VersioningConfiguration: Status: Enabled PinpointFirehoseESIAMRole: Type: AWS::IAM::Role DeletionPolicy: Delete Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: firehose.amazonaws.com Action: sts:AssumeRole Path: / Policies: - PolicyName: QnAFirehose PolicyDocument: Version: '2012-10-17' Statement: - Sid: 'S3bucket' Effect: Allow Action: - s3:AbortMultipartUpload - s3:GetBucketLocation - s3:GetObject - s3:ListBucket - s3:ListBucketMultipartUploads - s3:PutObject Resource: - !GetAtt PinpointStreamBucket.Arn - !Join - '' - - !GetAtt PinpointStreamBucket.Arn - /* - Sid: 'CloudwatchLog' Effect: Allow Action: - logs:PutLogEvents Resource: - !Join - '' - - 'arn:aws:logs:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - :log-group:/aws/kinesisfirehose/* - Sid: 'Translate' Effect: Allow Action: - translate:* Resource: '*' PinpointStreamFirehose: Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamType: DirectPut S3DestinationConfiguration: BucketARN: !GetAtt PinpointStreamBucket.Arn BufferingHints: IntervalInSeconds: 60 SizeInMBs: 5 CompressionFormat: UNCOMPRESSED RoleARN: !GetAtt PinpointFirehoseESIAMRole.Arn LambdaInvokePermission: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt RollingWindowOptOutLambdaFunction.Arn Action: lambda:InvokeFunction Principal: s3.amazonaws.com SourceAccount: !Ref AWS::AccountId SourceArn: !GetAtt PinpointStreamBucket.Arn RollingWindowOptOutLambdaFunctionRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: / Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Sid: 'S3Notification' Effect: Allow Action: - s3:GetBucketNotification - s3:PutBucketNotification - s3:GetObject - s3:ListBucket Resource: - !GetAtt PinpointStreamBucket.Arn - !Join - '' - - !GetAtt PinpointStreamBucket.Arn - /* - Sid: 'LogGroup' Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: 'arn:aws:logs:*:*:*' - Sid: PinpointAll Effect: Allow Action: mobiletargeting:* Resource: !Sub 'arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/*' - Sid: Translate Effect: Allow Action: translate:TranslateText Resource: '*' RollingWindowOptOutLambdaFunction: Type: AWS::Lambda::Function Properties: Description: a lambda function that check the current pinpoint event date and the endpoint first enrollment date and decide if it is time to opt out the endpoint. Role: !GetAtt RollingWindowOptOutLambdaFunctionRole.Arn Handler: index.lambda_handler Runtime: python3.6 MemorySize: 512 Timeout: 60 Environment: Variables: Region: !Sub ${AWS::Region} OptoutAge: !Ref OptoutAge OptoutMessage: !Ref optoutmessage Code: ZipFile: | import json import boto3 import os import re import datetime from urllib.parse import unquote_plus import logging logger = logging.getLogger() s3_client = boto3.client('s3') pinpoint = boto3.client('pinpoint', region_name=os.environ['Region']) translate = boto3.client('translate') def lambda_handler(event, context): for record in event['Records']: bucket = record['s3']['bucket']['name'] key = unquote_plus(record['s3']['object']['key']) path="{0}/{1}".format(bucket, key) content = s3_client.get_object(Bucket=bucket, Key=key)['Body'].read().decode() content = re.sub(r'}(\s+){', '},{', content) content = '[' + content + ']' eventObjArray = json.loads(content) for eventObj in eventObjArray: logger.info(eventObj) projectId = eventObj['application']['app_id'] eventTime = datetime.datetime.fromtimestamp(eventObj['event_timestamp']/1000) endpointId = eventObj['attributes']['destination_phone_number'][1:] response = pinpoint.get_endpoint( ApplicationId=projectId, EndpointId=endpointId ) optOutFlag = response['EndpointResponse']['OptOut'] if optOutFlag == 'NONE': logger.info("Endpoint: {} is currently Opt In.".format(endpointId)) optInTime = response['EndpointResponse']['Attributes']['OptInTimestamp'][-1].split('+')[0] createTime = datetime.datetime.strptime(optInTime, '%a %b %d %Y %H:%M:%S %Z') if abs((eventTime - createTime).days)>=int(os.environ['OptoutAge']): logger.info("Endpoint age beyong limit. Automaticallhy Opt Out") resp=pinpoint.update_endpoint( ApplicationId=projectId, EndpointId=endpointId, EndpointRequest= { 'OptOut': 'ALL', 'Attributes': { 'OptOutTimestamp': [ eventTime.strftime("%Y-%m-%dT%H:%M:%S.%fZ") ] } } ) translatedMsg = translate.translate_text( Text=os.environ['OptoutMessage'], SourceLanguageCode='en', TargetLanguageCode=response['EndpointResponse']['User']['UserAttributes']['Language'][0] ) resp=pinpoint.send_messages( ApplicationId=projectId, MessageRequest={ 'Addresses': { eventObj['attributes']['destination_phone_number']: { 'ChannelType': 'SMS' } }, 'MessageConfiguration': { 'SMSMessage': { 'Body': translatedMsg['TranslatedText'], 'MessageType': "TRANSACTIONAL", 'OriginationNumber': eventObj['attributes']['origination_phone_number'] } } } ) logger.info(resp) else: logger.info("Nothing will be changed.") else: logger.info("Endpoint: {} is currently Opt Out.".format(endpointId)) LambdaIAMRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:GetBucketNotification' - 's3:PutBucketNotification' Resource: !Sub 'arn:aws:s3:::${PinpointStreamBucket}' - Effect: Allow Action: - 'logs:CreateLogGroup' - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: 'arn:aws:logs:*:*:*' CustomResourceLambdaFunction: Type: 'AWS::Lambda::Function' Properties: Handler: index.lambda_handler Role: !GetAtt LambdaIAMRole.Arn Code: ZipFile: | from __future__ import print_function import json import boto3 import cfnresponse SUCCESS = "SUCCESS" FAILED = "FAILED" print('Loading function') s3 = boto3.resource('s3') def lambda_handler(event, context): print("Received event: " + json.dumps(event, indent=2)) responseData={} try: if event['RequestType'] == 'Delete': print("Request Type:",event['RequestType']) Bucket=event['ResourceProperties']['Bucket'] delete_notification(Bucket) print("Sending response to custom resource after Delete") elif event['RequestType'] == 'Create' or event['RequestType'] == 'Update': print("Request Type:",event['RequestType']) LambdaArn=event['ResourceProperties']['LambdaArn'] Bucket=event['ResourceProperties']['Bucket'] add_notification(LambdaArn, Bucket) responseData={'Bucket':Bucket} print("Sending response to custom resource") responseStatus = 'SUCCESS' except Exception as e: print('Failed to process:', e) responseStatus = 'FAILURE' responseData = {'Failure': 'Something bad happened.'} cfnresponse.send(event, context, responseStatus, responseData) def add_notification(LambdaArn, Bucket): bucket_notification = s3.BucketNotification(Bucket) response = bucket_notification.put( NotificationConfiguration={ 'LambdaFunctionConfigurations': [ { 'LambdaFunctionArn': LambdaArn, 'Events': [ 's3:ObjectCreated:*' ] } ] } ) print("Put request completed....") def delete_notification(Bucket): bucket_notification = s3.BucketNotification(Bucket) response = bucket_notification.put( NotificationConfiguration={} ) print("Delete request completed....") Runtime: python3.6 Timeout: 50 LambdaTrigger: Type: 'Custom::LambdaTrigger' DependsOn: LambdaInvokePermission Properties: ServiceToken: !GetAtt CustomResourceLambdaFunction.Arn LambdaArn: !GetAtt RollingWindowOptOutLambdaFunction.Arn Bucket: !Ref PinpointStreamBucket