AWSTemplateFormatVersion: 2010-09-09 Description: Event Based Segmentation Parameters: PinpointProjectId: Type: String Description: Amazon Pinpoint Project ID if one already exists, blank to create one PinpointProjectName: Type: String Default: "My Pinpoint Project" Description: "If no PinpointProjectId provided, name to be used to create the Pinpoint project" Conditions: NeedsPinpointProjectId: !Equals - '' - !Ref PinpointProjectId Resources: PinpointApplication: Type: AWS::Pinpoint::App Condition: NeedsPinpointProjectId Properties: Name: !Ref PinpointProjectName PinpointEventStream: Type: AWS::Pinpoint::EventStream Properties: ApplicationId: !If - NeedsPinpointProjectId - !Ref PinpointApplication - !Ref PinpointProjectId DestinationStreamArn: !GetAtt PinpointEventKinesis.Arn RoleArn: !GetAtt PinpointKinesisStreamRole.Arn PinpointEventKinesis: Type: AWS::Kinesis::Stream Properties: ShardCount: 1 StreamEncryption: EncryptionType: KMS KeyId: alias/aws/kinesis PinpointKinesisStreamRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "kinesis:PutRecords" - "kinesis:DescribeStream" Resource: !GetAtt PinpointEventKinesis.Arn EventProcessingLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt EventProcessingLambdaRole.Arn Runtime: "python3.7" Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" PINPOINT_PROJECT_ID: !If - NeedsPinpointProjectId - !Ref PinpointApplication - !Ref PinpointProjectId Code: ZipFile: | import boto3 import logging import os import json import base64 pinpoint = boto3.client('pinpoint') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) for record in event['Records']: #Kinesis data is base64 encoded so decode here try: payload=base64.b64decode(record["kinesis"]["data"])'Found Event: %s', payload) pinpoint_event = json.loads(payload) if pinpoint_event['event_type'] == '': # We have an email open event, update the endpoint with the attribute "opened_email" = "true" pinpoint.update_endpoint( ApplicationId=os.environ['PINPOINT_PROJECT_ID'], EndpointId=pinpoint_event['client']['client_id'], #this is true for campaign sends EndpointRequest={ 'Attributes': { 'opened_email': [ 'true' ] } } ) elif pinpoint_event['event_type'] == '_custom.registered_for_webinar5': # We have a custom event via the Pinpoint PutEvents API, update attribute pinpoint.update_endpoint( ApplicationId=os.environ['PINPOINT_PROJECT_ID'], EndpointId=pinpoint_event['client']['client_id'], EndpointRequest={ 'Attributes': { 'webinar_registration': [ 'webinar5' ] } } ) except Exception as e: logging.error('Received Error while processing s3 file: %s', e) EventProcessingLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSLambdaKinesisExecutionRole" Policies: - PolicyName: "LambdaExecutionPolicy" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "mobiletargeting:UpdateEndpoint" Resource: !Sub - 'arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${ProjectId}*' - {ProjectId: !If [NeedsPinpointProjectId, !Ref PinpointApplication, !Ref PinpointProjectId] } - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" EventProcessingLambdaTrigger: Type: AWS::Lambda::EventSourceMapping Properties: Enabled: true EventSourceArn: !GetAtt PinpointEventKinesis.Arn FunctionName: !GetAtt EventProcessingLambda.Arn StartingPosition: 'TRIM_HORIZON'