Description: This template corresponds to "event stream configured using Kinesis Data Stream" scenario with your Pinpoint project and hence deploys Amazon Kinesis Data Firehose (as consumer for existing Kinesis data stream) along with Transformation Lambda to trigger SMS retry on _SMS.FAILURE event for UNREACHABLE, UNKNOWN, CARRIER_UNREACHABLE & TTL_EXPIRED record status. If your Kinesis Firehose is already configured with a transformtion lambda then the stack exists gracefully with relevant message Parameters: PinpointApplicationId: Description: The ID of your Pinpoint Application Type: String Resources: DynamoDBTable: Type: AWS::DynamoDB::Table Properties: TableName: !Join - "-" - - "pinpoint_message_state_table" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" BillingMode: PAY_PER_REQUEST AttributeDefinitions: - AttributeName: application_message_id AttributeType: S SSESpecification: SSEEnabled: True KeySchema: - AttributeName: application_message_id KeyType: HASH PinpointDDBProducerLambdaFunction: Type: AWS::Lambda::Function DependsOn: - RetryLambdaExecutionRole - DynamoDBTable Properties: Code: ZipFile: | import json import boto3 import botocore from botocore.exceptions import ClientError import random import datetime import os import string pinpoint_client = boto3.client('pinpoint') ddb_client = boto3.client('dynamodb') ddb_table = os.environ['dynamodbStateTableName'] application_id = os.environ['pinpointApplicationId'] message_type = os.environ['messageType'] destination_number = os.environ['destinationNumber'] def lambda_handler(event, context): message = ("Sample body of SMS sent from Lambda at "+ str(datetime.datetime.now())) destinationNumber = destination_number application_message_id = ''.join(random.choices(string.ascii_uppercase +string.digits, k = 7)) try: response = pinpoint_client.send_messages( ApplicationId=application_id, MessageRequest={ 'Addresses': { destinationNumber: { 'ChannelType': 'SMS' } }, 'Context': { 'application_message_id': application_message_id }, 'MessageConfiguration': { 'SMSMessage': { 'Body': message, 'MessageType': message_type } } } ) ddb_client.put_item( TableName=ddb_table, Item={ 'application_message_id': {'S' : application_message_id}, 'pinpoint_message_id' :{'S' : response['MessageResponse']['Result'][destinationNumber]['MessageId']}, 'message_body':{'S' : message}, 'retry_count' :{'N': '0'}, 'all_retries_failed' :{'BOOL': False} } ) except ClientError as e: print(e.response['Error']['Message']) else: print("Message sent! Message ID: "+ response['MessageResponse']['Result'][destinationNumber]['MessageId']) return 'Message sent! Message ID: '+ response['MessageResponse']['Result'][destinationNumber]['MessageId'] Handler: index.lambda_handler FunctionName: !Join - "-" - - "producer_lambda" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" Role: !GetAtt RetryLambdaExecutionRole.Arn Runtime: python3.8 Environment: Variables: dynamodbStateTableName : !Ref DynamoDBTable pinpointApplicationId: !Ref PinpointApplicationId messageType: 'TRANSACTIONAL' destinationNumber: '+61455944039' Timeout: 60 MemorySize: 128 RetryLambdaExecutionRole: DependsOn: - DynamoDBTable - CustomResourceToAddLambdaAsConsumerForExistingKinesisStream Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: allowLambdaLogs PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:* - Effect: Allow Action: - dynamodb:GetItem - dynamodb:Scan - dynamodb:Query - dynamodb:PutItem - dynamodb:UpdateItem Resource: !Join ['', ["arn:aws:dynamodb:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":table/", !Ref DynamoDBTable]] - Effect: Allow Action: - kinesis:GetShardIterator - kinesis:GetRecords - kinesis:DescribeStream - kinesis:ListShards - kinesis:ListStreams Resource: !GetAtt CustomResourceToAddLambdaAsConsumerForExistingKinesisStream.DestinationStreamArn - Effect: Allow Action: - mobiletargeting:SendMessages Resource: !Join ['', ["arn:aws:mobiletargeting:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":apps/", !Ref PinpointApplicationId, "/messages"]] RetryLambdaFunction: Type: AWS::Lambda::Function DependsOn: - RetryLambdaExecutionRole - DynamoDBTable Properties: Code: ZipFile: | import base64 import boto3 import json import os ddb_client = boto3.client('dynamodb') pinpoint_client = boto3.client('pinpoint') ddb_table = os.environ['dynamodbStateTableName'] pinpoint_application_id = os.environ['pinpointApplicationId'] max_retries = int(os.environ['maxRetries']) def lambda_handler(event, context): #print(event) try: for record in event['Records']: payload = base64.b64decode(record['kinesis']['data']).decode('utf-8') json_payload = json.loads(payload) #print(json_payload) event_type = json_payload['event_type'] record_status = json_payload['attributes']['record_status'] message_id = json_payload['attributes']['message_id'] destination_number_to_resend_sms = json_payload['attributes']['destination_phone_number'] application_message_id = json.loads(json_payload['attributes']['customer_context'])['application_message_id'] if event_type == '_SMS.FAILURE': if record_status=='UNREACHABLE' or record_status=='UNKNOWN' or record_status=='CARRIER_UNREACHABLE' or record_status=='TTL_EXPIRED' or record_status=='BLOCKED': ddb_response = ddb_client.get_item( TableName=ddb_table, Key={'application_message_id': {'S': application_message_id}} ) message_body = ddb_response['Item']['message_body']['S'] retry_count = int(ddb_response['Item']['retry_count']['N']) #if a pinpoint SMS fails and its a genuine record_status, we retry 5 times as default, configurable via maxRetries environment variable. if (retry_count < max_retries): # retry pinpoint_response = pinpoint_client.send_messages( ApplicationId=pinpoint_application_id, MessageRequest={ 'Addresses': { destination_number_to_resend_sms: { 'ChannelType': 'SMS' } }, 'Context': { 'application_message_id': application_message_id }, 'MessageConfiguration': { 'SMSMessage': { 'Body': message_body, 'MessageType': 'TRANSACTIONAL' } } } ) message_id_after_retry = pinpoint_response['MessageResponse']['Result'][destination_number_to_resend_sms]['MessageId'] print("Retry attempt number "+str(retry_count)+ " for application_message_id " +application_message_id+ " . New pinpoint_message_id after retry is " +message_id_after_retry +" . Updating DynamoDB table now") # retry done, increment retry_count and update pinpoint_message_id of new attempt in DynamoDB response = ddb_client.update_item( TableName=ddb_table, UpdateExpression='SET retry_count = retry_count + :incr, pinpoint_message_id = :message_id_after_retry_expression', ExpressionAttributeValues={ ':incr':{ "N": "1" }, ':message_id_after_retry_expression': { "S": message_id_after_retry } }, Key={ 'application_message_id': { 'S': application_message_id, } }, ReturnValues='UPDATED_NEW', ) print("DynamoDB update completed") else: # Placing final flag in the dynamo DB response = ddb_client.update_item( TableName=ddb_table, UpdateExpression='SET all_retries_failed = :all_retries_failed_expression', ExpressionAttributeValues={ ':all_retries_failed_expression': { "BOOL": True } }, Key={ 'application_message_id': { 'S': application_message_id, } }, ReturnValues='UPDATED_NEW', ) print('Performed enough retries i.e. ' +str(max_retries)+ ' giving up now') else: print('shouldnt retry this type of Pinpoint error') else: print('not a SMS related Pinpoint failure event') except Exception as e: print (e) return 'Successfully processed {} records.'.format(len(event['Records'])) Handler: index.lambda_handler FunctionName: !Join - "-" - - "retry_lambda" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" Role: !GetAtt RetryLambdaExecutionRole.Arn Runtime: python3.8 Environment: Variables: dynamodbStateTableName : !Ref DynamoDBTable pinpointApplicationId: !Ref PinpointApplicationId maxRetries: 5 Timeout: 60 MemorySize: 128 EventSourceMapping: DependsOn: - RetryLambdaFunction Type: AWS::Lambda::EventSourceMapping Properties: BatchSize: 1 EventSourceArn: !GetAtt CustomResourceToAddLambdaAsConsumerForExistingKinesisStream.DestinationStreamArn FunctionName: !GetAtt RetryLambdaFunction.Arn StartingPosition: TRIM_HORIZON CustomResourceToAddLambdaAsConsumerForExistingKinesisStream: Type: Custom::addTransLambda Properties: ServiceToken: !GetAtt HelperLambdaToDetermineKinesisDataStreamForPinpointProject.Arn PinpointProjectID: !Ref PinpointApplicationId HelperLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: allowLambdaLogs PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:* Resource: arn:aws:logs:*:*:* - PolicyName: allowPinpointAccess PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - mobiletargeting:GetEventStream Resource: !Join ['', ["arn:aws:mobiletargeting:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":apps/", !Ref PinpointApplicationId, "/eventstream"]] HelperLambdaToDetermineKinesisDataStreamForPinpointProject: Type: AWS::Lambda::Function DependsOn: - HelperLambdaExecutionRole Properties: Code: ZipFile: | import json import boto3 import cfnresponse def lambda_handler(event, context): try: projectID = event['ResourceProperties']['PinpointProjectID'] pinpoint_client = boto3.client('pinpoint') pinpoint_response = pinpoint_client.get_event_stream( ApplicationId=projectID ) #determine kinesis id from pinpoint id and send a string msg via 'Data' to DeliveryStream.KinesisStreamSourceConfiguration.KinesisStreamARN responseData = {} responseData['DestinationStreamArn'] = pinpoint_response['EventStream']['DestinationStreamArn'] cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData) except Exception as e: print(e) responseData = {"DestinationStreamArn": str(err)} cfnresponse.send(event, context, cfnresponse.FAILED, responseData) FunctionName: !Join - "-" - - "add_lambda_as_kinesis_consumer" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" MemorySize: 128 Runtime: python3.6 Description: Lambda function that executes during stack creation and deletion to achieve CleanupBucketOnStackDelete Handler: index.lambda_handler Role: !GetAtt HelperLambdaExecutionRole.Arn Timeout: 900 Outputs: ProducerLambda: Description: The example lambda function that acts as SMS producer for Pinpoint Value: !Ref PinpointDDBProducerLambdaFunction DDBStateTable: Description: The DynamoDB table where SMS producer lambda stores it's send state and retry lambda reads to perform Pinpoint retry Value: !Ref DynamoDBTable RetryLambda: Description: The lambda function to perform Pinpoint retry using DynamoDB state table and Pinpoint _SMS.FAILURE event Value: !Ref RetryLambdaFunction