Description: This template corresponds to "no event stream configured" scenario with your Pinpoint project and hence configures event stream with a Kinesis Data Stream and a Lambda consumer that triggers SMS retry on _SMS.FAILURE events having record status as UNREACHABLE, UNKNOWN, CARRIER_UNREACHABLE & TTL_EXPIRED Parameters: PinpointApplicationId: Description: The ID of your Pinpoint Application Type: String Resources: DynamoDBTable: Type: AWS::DynamoDB::Table Properties: TableName: !Join - "-" - - "pinpoint_message_state_table" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" BillingMode: PAY_PER_REQUEST AttributeDefinitions: - AttributeName: application_message_id AttributeType: S SSESpecification: SSEEnabled: True KeySchema: - AttributeName: application_message_id KeyType: HASH PinpointDDBProducerLambdaFunction: Type: AWS::Lambda::Function DependsOn: - RetryLambdaExecutionRole - DynamoDBTable Properties: Code: ZipFile: | import json import boto3 import botocore from botocore.exceptions import ClientError import random import datetime import os import string pinpoint_client = boto3.client('pinpoint') ddb_client = boto3.client('dynamodb') ddb_table = os.environ['dynamodbStateTableName'] application_id = os.environ['pinpointApplicationId'] message_type = os.environ['messageType'] destination_number = os.environ['destinationNumber'] def lambda_handler(event, context): message = ("Sample body of SMS sent from Lambda at "+ str(datetime.datetime.now())) destinationNumber = destination_number application_message_id = ''.join(random.choices(string.ascii_uppercase +string.digits, k = 7)) try: response = pinpoint_client.send_messages( ApplicationId=application_id, MessageRequest={ 'Addresses': { destinationNumber: { 'ChannelType': 'SMS' } }, 'Context': { 'application_message_id': application_message_id }, 'MessageConfiguration': { 'SMSMessage': { 'Body': message, 'MessageType': message_type } } } ) ddb_client.put_item( TableName=ddb_table, Item={ 'application_message_id': {'S' : application_message_id}, 'pinpoint_message_id' :{'S' : response['MessageResponse']['Result'][destinationNumber]['MessageId']}, 'message_body':{'S' : message}, 'retry_count' :{'N': '0'}, 'all_retries_failed' :{'BOOL': False} } ) except ClientError as e: print(e.response['Error']['Message']) else: print("Message sent! Message ID: "+ response['MessageResponse']['Result'][destinationNumber]['MessageId']) return 'Message sent! Message ID: '+ response['MessageResponse']['Result'][destinationNumber]['MessageId'] Handler: index.lambda_handler FunctionName: !Join - "-" - - "producer_lambda" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" Role: !GetAtt RetryLambdaExecutionRole.Arn Runtime: python3.8 Environment: Variables: dynamodbStateTableName : !Ref DynamoDBTable pinpointApplicationId: !Ref PinpointApplicationId messageType: 'TRANSACTIONAL' destinationNumber: '+61455944039' Timeout: 60 MemorySize: 128 DataStream: Type: AWS::Kinesis::Stream Properties: Name: !Join - "-" - - 'kds_for_pinpoint_sms_retry' - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" RetentionPeriodHours: 24 ShardCount: 1 StreamEncryption: EncryptionType: KMS KeyId: 'alias/aws/kinesis' RetryLambdaExecutionRole: DependsOn: - DynamoDBTable Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: allowLambdaLogs PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:* - Effect: Allow Action: - dynamodb:GetItem - dynamodb:Scan - dynamodb:Query - dynamodb:PutItem - dynamodb:UpdateItem Resource: !Join ['', ["arn:aws:dynamodb:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":table/", !Ref DynamoDBTable]] - Effect: Allow Action: - kinesis:GetShardIterator - kinesis:GetRecords - kinesis:DescribeStream - kinesis:ListShards - kinesis:ListStreams Resource: !Join ['', ["arn:aws:kinesis:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":stream/", !Ref DataStream]] - Effect: Allow Action: - mobiletargeting:SendMessages Resource: !Join ['', ["arn:aws:mobiletargeting:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":apps/", !Ref PinpointApplicationId, "/messages"]] RetryLambdaFunction: Type: AWS::Lambda::Function DependsOn: - RetryLambdaExecutionRole - DynamoDBTable Properties: Code: ZipFile: | import base64 import boto3 import json import os ddb_client = boto3.client('dynamodb') pinpoint_client = boto3.client('pinpoint') ddb_table = os.environ['dynamodbStateTableName'] pinpoint_application_id = os.environ['pinpointApplicationId'] max_retries = int(os.environ['maxRetries']) def lambda_handler(event, context): #print(event) try: for record in event['Records']: payload = base64.b64decode(record['kinesis']['data']).decode('utf-8') json_payload = json.loads(payload) #print(json_payload) event_type = json_payload['event_type'] record_status = json_payload['attributes']['record_status'] message_id = json_payload['attributes']['message_id'] destination_number_to_resend_sms = json_payload['attributes']['destination_phone_number'] application_message_id = json.loads(json_payload['attributes']['customer_context'])['application_message_id'] if event_type == '_SMS.FAILURE': if record_status=='UNREACHABLE' or record_status=='UNKNOWN' or record_status=='CARRIER_UNREACHABLE' or record_status=='TTL_EXPIRED' or record_status=='BLOCKED': ddb_response = ddb_client.get_item( TableName=ddb_table, Key={'application_message_id': {'S': application_message_id}} ) message_body = ddb_response['Item']['message_body']['S'] retry_count = int(ddb_response['Item']['retry_count']['N']) #if a pinpoint SMS fails and its a genuine record_status, we retry 5 times as default, configurable via maxRetries environment variable. if (retry_count < max_retries): # retry pinpoint_response = pinpoint_client.send_messages( ApplicationId=pinpoint_application_id, MessageRequest={ 'Addresses': { destination_number_to_resend_sms: { 'ChannelType': 'SMS' } }, 'Context': { 'application_message_id': application_message_id }, 'MessageConfiguration': { 'SMSMessage': { 'Body': message_body, 'MessageType': 'TRANSACTIONAL' } } } ) message_id_after_retry = pinpoint_response['MessageResponse']['Result'][destination_number_to_resend_sms]['MessageId'] print("Retry attempt number "+str(retry_count)+ " for application_message_id " +application_message_id+ " . New pinpoint_message_id after retry is " +message_id_after_retry +" . Updating DynamoDB table now") # retry done, increment retry_count and update pinpoint_message_id of new attempt in DynamoDB response = ddb_client.update_item( TableName=ddb_table, UpdateExpression='SET retry_count = retry_count + :incr, pinpoint_message_id = :message_id_after_retry_expression', ExpressionAttributeValues={ ':incr':{ "N": "1" }, ':message_id_after_retry_expression': { "S": message_id_after_retry } }, Key={ 'application_message_id': { 'S': application_message_id, } }, ReturnValues='UPDATED_NEW', ) print("DynamoDB update completed") else: # Placing final flag in the dynamo DB response = ddb_client.update_item( TableName=ddb_table, UpdateExpression='SET all_retries_failed = :all_retries_failed_expression', ExpressionAttributeValues={ ':all_retries_failed_expression': { "BOOL": True } }, Key={ 'application_message_id': { 'S': application_message_id, } }, ReturnValues='UPDATED_NEW', ) print('Performed enough retries i.e. ' +str(max_retries)+ ' giving up now') else: print('shouldnt retry this type of Pinpoint error') else: print('not a SMS related Pinpoint failure event') except Exception as e: print (e) return 'Successfully processed {} records.'.format(len(event['Records'])) Handler: index.lambda_handler FunctionName: !Join - "-" - - "retry_lambda" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" Role: !GetAtt RetryLambdaExecutionRole.Arn Runtime: python3.8 Environment: Variables: dynamodbStateTableName : !Ref DynamoDBTable pinpointApplicationId: !Ref PinpointApplicationId maxRetries: 5 Timeout: 60 MemorySize: 128 EventSourceMapping: DependsOn: - RetryLambdaFunction - DataStream Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 EventSourceArn: !GetAtt DataStream.Arn FunctionName: !GetAtt RetryLambdaFunction.Arn StartingPosition: TRIM_HORIZON PinpointEventStreamConfiguration: Type: AWS::Pinpoint::EventStream Properties: ApplicationId: !Ref PinpointApplicationId DestinationStreamArn: !GetAtt DataStream.Arn RoleArn: !GetAtt PinpointRole.Arn PinpointRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: pinpoint.amazonaws.com Action: 'sts:AssumeRole' Policies: - PolicyName: allowLambdaLogs PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - mobiletargeting:PutEventStream - mobiletargeting:GetEventStream Resource: !Join ['', ["arn:aws:mobiletargeting:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":apps/", !Ref PinpointApplicationId, "/eventstream"]] - Effect: Allow Action: - kinesis:PutRecord - kinesis:PutRecords - kinesis:DescribeStream - kinesis:ListShards - kinesis:ListStreams Resource: !Join ['', ["arn:aws:kinesis:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":stream/", !Ref DataStream]] Outputs: ProducerLambda: Description: The example lambda function that acts as SMS producer for Pinpoint Value: !Ref PinpointDDBProducerLambdaFunction DDBStateTable: Description: The DynamoDB table where SMS producer lambda stores it's send state and retry lambda reads to perform Pinpoint retry Value: !Ref DynamoDBTable RetryLambda: Description: The lambda function to perform Pinpoint retry using DynamoDB state table and Pinpoint _SMS.FAILURE event Value: !Ref RetryLambdaFunction DataStream: Description: The Kinesis data stream created to receive Pinpoint _SMS.FAILURE event Value: !Ref DataStream