Description: This template corresponds to "event stream configured using Kinesis Firehose" scenario with your Pinpoint project and hence deploys Transformation Lambda for existing Amazon Kinesis Data Firehose to trigger SMS retry on _SMS.FAILURE event for UNREACHABLE, UNKNOWN, CARRIER_UNREACHABLE & TTL_EXPIRED record status. If your Kinesis Firehose is already configured with a transformtion lambda then the stack exists gracefully with relevant message Parameters: PinpointApplicationId: Description: The ID of your Pinpoint Application Type: String Resources: DynamoDBTable: Type: AWS::DynamoDB::Table Properties: TableName: !Join - "-" - - "pinpoint_message_state_table" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" BillingMode: PAY_PER_REQUEST AttributeDefinitions: - AttributeName: application_message_id AttributeType: S SSESpecification: SSEEnabled: True KeySchema: - AttributeName: application_message_id KeyType: HASH PinpointDDBProducerLambdaFunction: Type: AWS::Lambda::Function DependsOn: - RetryLambdaExecutionRole - DynamoDBTable Properties: Code: ZipFile: | import json import boto3 import botocore from botocore.exceptions import ClientError import random import datetime import os import string pinpoint_client = boto3.client('pinpoint') ddb_client = boto3.client('dynamodb') ddb_table = os.environ['dynamodbStateTableName'] application_id = os.environ['pinpointApplicationId'] message_type = os.environ['messageType'] destination_number = os.environ['destinationNumber'] def lambda_handler(event, context): message = ("Sample body of SMS sent from Lambda at "+ str(datetime.datetime.now())) destinationNumber = destination_number application_message_id = ''.join(random.choices(string.ascii_uppercase +string.digits, k = 7)) try: response = pinpoint_client.send_messages( ApplicationId=application_id, MessageRequest={ 'Addresses': { destinationNumber: { 'ChannelType': 'SMS' } }, 'Context': { 'application_message_id': application_message_id }, 'MessageConfiguration': { 'SMSMessage': { 'Body': message, 'MessageType': message_type } } } ) ddb_client.put_item( TableName=ddb_table, Item={ 'application_message_id': {'S' : application_message_id}, 'pinpoint_message_id' :{'S' : response['MessageResponse']['Result'][destinationNumber]['MessageId']}, 'message_body':{'S' : message}, 'retry_count' :{'N': '0'}, 'all_retries_failed' :{'BOOL': False} } ) except ClientError as e: print(e.response['Error']['Message']) else: print("Message sent! Message ID: "+ response['MessageResponse']['Result'][destinationNumber]['MessageId']) return 'Message sent! Message ID: '+ response['MessageResponse']['Result'][destinationNumber]['MessageId'] Handler: index.lambda_handler FunctionName: !Join - "-" - - "producer_lambda" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" Role: !GetAtt RetryLambdaExecutionRole.Arn Runtime: python3.8 Environment: Variables: dynamodbStateTableName : !Ref DynamoDBTable pinpointApplicationId: !Ref PinpointApplicationId messageType: 'TRANSACTIONAL' destinationNumber: '+61455944039' Timeout: 60 MemorySize: 128 RetryLambdaExecutionRole: DependsOn: - DynamoDBTable Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: allowLambdaLogs PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutLogEvents Resource: !Sub arn:aws:logs:${AWS::Region}:${AWS::AccountId}:* - Effect: Allow Action: - dynamodb:GetItem - dynamodb:Scan - dynamodb:Query - dynamodb:PutItem - dynamodb:UpdateItem Resource: !Join ['', ["arn:aws:dynamodb:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":table/", !Ref DynamoDBTable]] - Effect: Allow Action: - mobiletargeting:SendMessages Resource: !Join ['', ["arn:aws:mobiletargeting:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":apps/", !Ref PinpointApplicationId, "/messages"]] RetryLambdaFunction: Type: AWS::Lambda::Function DependsOn: - RetryLambdaExecutionRole - DynamoDBTable Properties: Code: ZipFile: | import base64 import boto3 import json import os ddb_client = boto3.client('dynamodb') pinpoint_client = boto3.client('pinpoint') ddb_table = os.environ['dynamodbStateTableName'] pinpoint_application_id = os.environ['pinpointApplicationId'] max_retries = int(os.environ['maxRetries']) def lambda_handler(event, context): #print(event) try: for record in event['Records']: payload = base64.b64decode(record['kinesis']['data']).decode('utf-8') json_payload = json.loads(payload) #print(json_payload) event_type = json_payload['event_type'] record_status = json_payload['attributes']['record_status'] message_id = json_payload['attributes']['message_id'] destination_number_to_resend_sms = json_payload['attributes']['destination_phone_number'] application_message_id = json.loads(json_payload['attributes']['customer_context'])['application_message_id'] if event_type == '_SMS.FAILURE': if record_status=='UNREACHABLE' or record_status=='UNKNOWN' or record_status=='CARRIER_UNREACHABLE' or record_status=='TTL_EXPIRED' or record_status=='BLOCKED': ddb_response = ddb_client.get_item( TableName=ddb_table, Key={'application_message_id': {'S': application_message_id}} ) message_body = ddb_response['Item']['message_body']['S'] retry_count = int(ddb_response['Item']['retry_count']['N']) #if a pinpoint SMS fails and its a genuine record_status, we retry 5 times as default, configurable via maxRetries environment variable. if (retry_count < max_retries): # retry pinpoint_response = pinpoint_client.send_messages( ApplicationId=pinpoint_application_id, MessageRequest={ 'Addresses': { destination_number_to_resend_sms: { 'ChannelType': 'SMS' } }, 'Context': { 'application_message_id': application_message_id }, 'MessageConfiguration': { 'SMSMessage': { 'Body': message_body, 'MessageType': 'TRANSACTIONAL' } } } ) message_id_after_retry = pinpoint_response['MessageResponse']['Result'][destination_number_to_resend_sms]['MessageId'] print("Retry attempt number "+str(retry_count)+ " for application_message_id " +application_message_id+ " . New pinpoint_message_id after retry is " +message_id_after_retry +" . Updating DynamoDB table now") # retry done, increment retry_count and update pinpoint_message_id of new attempt in DynamoDB response = ddb_client.update_item( TableName=ddb_table, UpdateExpression='SET retry_count = retry_count + :incr, pinpoint_message_id = :message_id_after_retry_expression', ExpressionAttributeValues={ ':incr':{ "N": "1" }, ':message_id_after_retry_expression': { "S": message_id_after_retry } }, Key={ 'application_message_id': { 'S': application_message_id, } }, ReturnValues='UPDATED_NEW', ) print("DynamoDB update completed") else: # Placing final flag in the dynamo DB response = ddb_client.update_item( TableName=ddb_table, UpdateExpression='SET all_retries_failed = :all_retries_failed_expression', ExpressionAttributeValues={ ':all_retries_failed_expression': { "BOOL": True } }, Key={ 'application_message_id': { 'S': application_message_id, } }, ReturnValues='UPDATED_NEW', ) print('Performed enough retries i.e. ' +str(max_retries)+ ' giving up now') else: print('shouldnt retry this type of Pinpoint error') else: print('not a SMS related Pinpoint failure event') except Exception as e: print (e) return 'Successfully processed {} records.'.format(len(event['Records'])) Handler: index.lambda_handler FunctionName: !Join - "-" - - "retry_lambda" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" Role: !GetAtt RetryLambdaExecutionRole.Arn Runtime: python3.8 Environment: Variables: dynamodbStateTableName : !Ref DynamoDBTable pinpointApplicationId: !Ref PinpointApplicationId maxRetries: 5 Timeout: 60 MemorySize: 128 AddTransformationLambdaToExistingDeliveryStream: Type: Custom::addTransLambda DependsOn: - RetryLambdaFunction Properties: ServiceToken: !GetAtt HelperLambdaToAddTransformationLambdaWithExistingDeliveryStream.Arn PinpointProjectID: !Ref PinpointApplicationId TransformationLambda: !GetAtt RetryLambdaFunction.Arn HelperLambdaExecutionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: allowLambdaLogs PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - logs:* Resource: arn:aws:logs:*:*:* - PolicyName: allowFirehoseAccess PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - firehose:* Resource: arn:aws:firehose:*:*:* - PolicyName: allowPutRolePolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - iam:PutRolePolicy Resource: !Join ['', ["arn:aws:iam::", !Ref AWS::AccountId, ":role/*"]] - PolicyName: allowPinpointAccess PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - mobiletargeting:PutEventStream - mobiletargeting:GetEventStream Resource: !Join ['', ["arn:aws:mobiletargeting:", !Ref AWS::Region, ":", !Ref AWS::AccountId, ":apps/", !Ref PinpointApplicationId, "/eventstream"]] HelperLambdaToAddTransformationLambdaWithExistingDeliveryStream: Type: AWS::Lambda::Function Properties: Code: ZipFile: | import json import boto3 import cfnresponse firehose_client = boto3.client('firehose') iam_client = boto3.client('iam') def add_trans_lambda_to_fhose(event,streamName,firehose_client,firehose_response,responseData,iam_client): transformationLambdaARN = event['ResourceProperties']['TransformationLambda'] if "S3DestinationDescription" in firehose_response['DeliveryStreamDescription']['Destinations'][0]: response = firehose_client.update_destination( DeliveryStreamName=streamName, CurrentDeliveryStreamVersionId=firehose_response['DeliveryStreamDescription']['VersionId'], DestinationId=firehose_response['DeliveryStreamDescription']['Destinations'][0]['DestinationId'], ExtendedS3DestinationUpdate={ 'ProcessingConfiguration': { 'Enabled': True, 'Processors': [ { 'Type': 'Lambda', 'Parameters': [ { 'ParameterName': 'LambdaArn', 'ParameterValue': transformationLambdaARN }, ] }, ] } } ) responseData['Data'] = "Added a transformation lambda to existing Kinesis Firehose stream" else: responseData['Data'] = "This Amazon Kinesis Data Firehose has a non S3 destination, exiting without modifying existing Pinpoint event stream pipeline" #Modify IAM role to add Lambda invovke inline policy elements = firehose_response['DeliveryStreamDescription']['Destinations'][0]['ExtendedS3DestinationDescription']['RoleARN'].split(':') ''' Following condition validates if the firehose has Service role - Arrary length =3 eg:- "role/service-role/KinesisFirehoseServiceRole-PUT-S3-E-ap-southeast-1-1652xxxxxxxx" If its an custom created role then the array length shall be 2. Eg:- "role/event-db1-PinpointKinesisFirehoseRole-1I1IUxxxxxx" ''' iam_elements = elements[5].split('/') if (len(iam_elements) == 2): iam_role_attached_to_firehose = iam_elements[1] else : iam_role_attached_to_firehose = iam_elements[2] response = iam_client.put_role_policy( PolicyDocument='{"Version":"2012-10-17","Statement":{"Effect": "Allow", "Action": ["lambda:InvokeFunction", "lambda:GetFunctionConfiguration" ],"Resource": ["'+transformationLambdaARN+'"]}}', PolicyName='AllowLambdaInvoke', RoleName=iam_role_attached_to_firehose ) return responseData def lambda_handler(event, context): try: responseData = {} if event['RequestType'] == 'Delete': responseData['Data'] = "Stack being deleted, do nothing" cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData) else: projectID = event['ResourceProperties']['PinpointProjectID'] pinpoint_client = boto3.client('pinpoint') pinpoint_response = pinpoint_client.get_event_stream( ApplicationId=projectID ) elements = pinpoint_response['EventStream']['DestinationStreamArn'].split(':') if elements[2] == "firehose": #determine firehose id from pinpoint id and add RetryLambdaFunction as trans lambda #if trans already exists then send a string msg via 'Data' to stack output streamName = elements[5].split('/')[1] firehose_response = firehose_client.describe_delivery_stream( DeliveryStreamName=streamName ) dict = firehose_response['DeliveryStreamDescription']['Destinations'][0]['ExtendedS3DestinationDescription'] if 'ProcessingConfiguration' in dict.keys(): processingConfigured = firehose_response['DeliveryStreamDescription']['Destinations'][0]['ExtendedS3DestinationDescription']['ProcessingConfiguration']['Enabled'] if processingConfigured: responseData['Data'] = "This Amazon Kinesis Data Firehose already has a transformation lambda configured, exiting without modifying existing Pinpoint event stream pipeline" cfnresponse.send(event, context, cfnresponse.FAILED, responseData) else: responseData = add_trans_lambda_to_fhose(event,streamName,firehose_client,firehose_response,responseData,iam_client) cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData) else: responseData = add_trans_lambda_to_fhose(event,streamName,firehose_client,firehose_response,responseData,iam_client) cfnresponse.send(event, context, cfnresponse.SUCCESS, responseData) except Exception as e: print(e) responseData['Data'] = str(e) cfnresponse.send(event, context, cfnresponse.FAILED, responseData) FunctionName: !Join - "-" - - "add_trans_lambda_to_fhose" - !Select - 0 - !Split - "-" - !Select - 2 - !Split - "/" - !Ref "AWS::StackId" MemorySize: 128 Runtime: python3.6 Description: Lambda function that executes during stack creation and deletion Handler: index.lambda_handler Role: !GetAtt HelperLambdaExecutionRole.Arn Timeout: 900 Outputs: ProducerLambda: Description: The example lambda function that acts as SMS producer for Pinpoint Value: !Ref PinpointDDBProducerLambdaFunction DDBStateTable: Description: The DynamoDB table where SMS producer lambda stores it's send state and retry lambda reads to perform Pinpoint retry Value: !Ref DynamoDBTable RetryLambda: Description: The lambda function to perform Pinpoint retry using DynamoDB state table and Pinpoint _SMS.FAILURE event Value: !Ref RetryLambdaFunction TransformationLambdaState: Description: The result of adding retry logic as transformation lambda to delivery stream Value: !GetAtt AddTransformationLambdaToExistingDeliveryStream.Data