AWSTemplateFormatVersion: 2010-09-09 Description: Salesforce to Pinpoint Sync using AppFlow Parameters: PinpointProjectId: Type: String Description: Amazon Pinpoint Project ID if one already exists, blank to create one PinpointProjectName: Type: String Default: "My Salesforce Synced Pinpoint Project" Description: "If no PinpointProjectId provided, name to be used to create the Pinpoint project" AppFlowS3Bucket: Type: String Description: Name of the S3 bucket that AppFlow uses S3ContactsPrefix: Type: String Description: S3 Prefix that AppFlow uses for Salesforce Contact Objects SalesforceFieldsToSync: Type: String Description: Comma delimitted list of Salesforce fields to sync to Pinpoint, limit 40 Default: Name.LastName,Name.FirstName,MailingPostalCode,MailingCountry,MailingState Conditions: NeedsPinpointProjectId: !Equals - '' - !Ref PinpointProjectId Resources: PinpointApplication: Type: AWS::Pinpoint::App Condition: NeedsPinpointProjectId Properties: Name: !Ref PinpointProjectName EventProcessingLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt EventProcessingLambdaRole.Arn Runtime: "python3.7" Timeout: 180 MemorySize: 1024 Environment: Variables: LOG_LEVEL: "INFO" SF_FIELDS: !Ref SalesforceFieldsToSync PINPOINT_PROJECT_ID: !If - NeedsPinpointProjectId - !Ref PinpointApplication - !Ref PinpointProjectId Code: # ZipFile has 4096 character limit, see below comments for full un-shortened source ZipFile: | import boto3 import logging import os import json import copy from botocore.exceptions import ClientError pinpoint = boto3.client('pinpoint') s3 = boto3.resource('s3') channel_types = {'Email': 'EMAIL', 'MobilePhone': 'SMS', 'HomePhone' : 'VOICE'} def lambda_handler(event, context): logging.getLogger().setLevel('INFO') logging.info(event) sf_fields = os.environ.get('SF_FIELDS').split(',') # Loop over all added files from S3 for record in event['Records']: bucket = record['s3']['bucket']['name'] key = record['s3']['object']['key'] obj = s3.Object(bucket, key) try: # Loop over each AppFlow Sync event in the S3 file for line in obj.get()['Body']._raw_stream: endpoint = { 'User': { 'UserAttributes': { } } } afd = json.loads(line) logging.info(afd) for sf_field in sf_fields: endpoint = re(endpoint, afd, sf_field) if 'ChangeEventHeader' in afd and afd['ChangeEventHeader']['changeType'] != 'DELETE': for sfId in afd['ChangeEventHeader']['recordIds']: for et in ['Email', 'MobilePhone', 'HomePhone']: ce = pc(endpoint, et, sfId) # If an endpoint address is in the data, add address too if et in afd: ce['Address'] = afd[et] if et == 'Email' else ef(afd[et]) if ce['Address'] is not None: se(ce, '%s%s' % (sfId, et)) elif afd['ChangeEventHeader']['changeType'] == 'UPDATE': se(ce, '%s%s' % (sfId, et)) elif 'ChangeEventHeader' in afd and afd['ChangeEventHeader']['changeType'] == 'DELETE': for sfId in afd['ChangeEventHeader']['recordIds']: for et in ['Email', 'MobilePhone', 'HomePhone']: ce = { EndpointStatus: 'INACTIVE' } se(ce, '%s%s' % (sfId, et)) elif afd['Id'] and not afd['IsDeleted']: for et in ['Email', 'MobilePhone', 'HomePhone']: ce = pc(endpoint, et, afd['Id']) if et in afd and afd[et] is not None: addr = afd[et] if et == 'Email' else ef(afd[et]) if addr is not None: ce['Address'] = addr se(ce, '%s%s' % (afd['Id'], et)) except Exception as e: logging.error(e) def ef(num): try: r = pinpoint.phone_number_validate( NumberValidateRequest={ 'PhoneNumber': num } ) return r['NumberValidateResponse']['CleansedPhoneNumberE164'] except ClientError as e: return None def pc(endpoint, et, sfId): ce = copy.deepcopy(endpoint) ce['ChannelType'] = channel_types[et] ce['User']['UserId'] = sfId return ce def se(endpoint, id): response = pinpoint.update_endpoint( ApplicationId=os.environ['PINPOINT_PROJECT_ID'], EndpointId=id, EndpointRequest=endpoint ) logging.info(response) def re(endpoint, afd, field): if '.' in field: [sub_field, new_field] = field.split('.', 1) if sub_field in afd: re(endpoint, afd[sub_field], new_field) elif field in afd and afd[field] is not None: endpoint['User']['UserAttributes'][field] = [afd[field]] return endpoint # Full / Un-shortened Lambda Code # import boto3 # import logging # import os # import json # import copy # from botocore.exceptions import ClientError # # pinpoint = boto3.client('pinpoint') # s3 = boto3.resource('s3') # # channel_types = {'Email': 'EMAIL', 'MobilePhone': 'SMS', 'HomePhone' : 'VOICE'} # # def lambda_handler(event, context): # # logging.getLogger().setLevel('INFO') # logging.info(event) # # sf_fields = os.environ.get('SF_FIELDS').split(',') # # # Loop over all added files from S3 # for record in event['Records']: # bucket = record['s3']['bucket']['name'] # key = record['s3']['object']['key'] # # obj = s3.Object(bucket, key) # # try: # # # Loop over each AppFlow Sync event in the S3 file # for line in obj.get()['Body']._raw_stream: # # endpoint = { # 'User': { # 'UserAttributes': { } # } # } # # appflow_data = json.loads(line) # logging.info(appflow_data) # # for sf_field in sf_fields: # endpoint = recursively_add_user_attrs(endpoint, appflow_data, sf_field) # # if 'ChangeEventHeader' in appflow_data and appflow_data['ChangeEventHeader']['changeType'] != 'DELETE': # # for salesforce_id in appflow_data['ChangeEventHeader']['recordIds']: # for endpoint_type in ['Email', 'MobilePhone', 'HomePhone']: # # channel_endpoint = prepare_channel_endpoint(endpoint, endpoint_type, salesforce_id) # # # If an endpoint address is in the data, add address too # if endpoint_type in appflow_data: # channel_endpoint['Address'] = appflow_data[endpoint_type] if endpoint_type == 'Email' else verify_phone_address(appflow_data[endpoint_type]) # if channel_endpoint['Address'] is not None: # send_endpoint(channel_endpoint, '%s%s' % (salesforce_id, endpoint_type)) # # elif appflow_data['ChangeEventHeader']['changeType'] == 'UPDATE': # send_endpoint(channel_endpoint, '%s%s' % (salesforce_id, endpoint_type)) # # elif 'ChangeEventHeader' in appflow_data and appflow_data['ChangeEventHeader']['changeType'] == 'DELETE': # for salesforce_id in appflow_data['ChangeEventHeader']['recordIds']: # for endpoint_type in ['Email', 'MobilePhone', 'HomePhone']: # channel_endpoint = { # EndpointStatus: 'INACTIVE' # } # send_endpoint(channel_endpoint, '%s%s' % (salesforce_id, endpoint_type)) # # elif appflow_data['Id'] and not appflow_data['IsDeleted']: # for endpoint_type in ['Email', 'MobilePhone', 'HomePhone']: # channel_endpoint = prepare_channel_endpoint(endpoint, endpoint_type, appflow_data['Id']) # if endpoint_type in appflow_data and appflow_data[endpoint_type] is not None: # addr = appflow_data[endpoint_type] if endpoint_type == 'Email' else verify_phone_address(appflow_data[endpoint_type]) # if addr is not None: # channel_endpoint['Address'] = addr # send_endpoint(channel_endpoint, '%s%s' % (appflow_data['Id'], endpoint_type)) # # except Exception as e: # logging.error(e) # # def verify_phone_address(num): # try: # r = pinpoint.phone_number_validate( # NumberValidateRequest={ # 'PhoneNumber': num # } # ) # return r['NumberValidateResponse']['CleansedPhoneNumberE164'] # except ClientError as e: # logging.info('Could not validate phone number: %s' % (num)) # return None # # def prepare_channel_endpoint(endpoint, et, salesforce_id): # channel_endpoint = copy.deepcopy(endpoint) # channel_endpoint['ChannelType'] = channel_types[et] # channel_endpoint['User']['UserId'] = salesforce_id # return channel_endpoint # # def send_endpoint(endpoint, id): # logging.info(endpoint) # response = pinpoint.update_endpoint( # ApplicationId=os.environ['PINPOINT_PROJECT_ID'], # EndpointId=id, # EndpointRequest=endpoint # ) # logging.info(response) # # def recursively_add_user_attrs(endpoint, appflow_data, field): # if '.' in field: # [sub_field, new_field] = field.split('.', 1) # if sub_field in appflow_data: # recursively_add_user_attrs(endpoint, appflow_data[sub_field], new_field) # # elif field in appflow_data and appflow_data[field] is not None: # endpoint['User']['UserAttributes'][field] = [appflow_data[field]] # # return endpoint EventProcessingLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - "sts:AssumeRole" Path: "/" EventProcessingLambdaRolePolicy: Type: AWS::IAM::Policy Properties: Roles: - !Ref EventProcessingLambdaRole PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "mobiletargeting:UpdateEndpointsBatch" - "mobiletargeting:UpdateEndpoint" Resource: !Sub - 'arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${ProjectId}*' - {ProjectId: !If [NeedsPinpointProjectId, !Ref PinpointApplication, !Ref PinpointProjectId] } - Effect: "Allow" Action: "mobiletargeting:PhoneNumberValidate" Resource: !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:phone/number/validate" - Effect: "Allow" Action: - "s3:GetBucketLocation" - "s3:GetObject" - "s3:ListBucket" Resource: - !Sub arn:aws:s3:::${AppFlowS3Bucket} - !Sub arn:aws:s3:::${AppFlowS3Bucket}/${S3ContactsPrefix}* - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" S3InvokeLambdaPermission: Type: AWS::Lambda::Permission Properties: Action: lambda:InvokeFunction FunctionName: !Ref EventProcessingLambda Principal: s3.amazonaws.com SourceArn: !Sub arn:aws:s3:::${AppFlowS3Bucket} SourceAccount: !Ref "AWS::AccountId" LambdaTrigger: Type: 'Custom::LambdaTrigger' DependsOn: S3InvokeLambdaPermission Properties: ServiceToken: !GetAtt CustomResourceLambdaFunction.Arn LambdaArn: !GetAtt EventProcessingLambda.Arn Bucket: !Ref AppFlowS3Bucket Prefix: !Ref S3ContactsPrefix CustomResourceLambdaFunction: Type: 'AWS::Lambda::Function' Properties: Handler: index.lambda_handler Role: !GetAtt CustomResourceLambdaFunctionRole.Arn Code: ZipFile: | import json import boto3 import cfnresponse import logging SUCCESS = "SUCCESS" FAILED = "FAILED" s3 = boto3.resource('s3') def lambda_handler(event, context): logging.getLogger().setLevel('INFO') logging.info("Received event: " + json.dumps(event, indent=2)) responseData={} try: if event['RequestType'] == 'Delete': Bucket=event['ResourceProperties']['Bucket'] delete_notification(Bucket) logging.info("Sending response to custom resource after Delete") elif event['RequestType'] == 'Create' or event['RequestType'] == 'Update': LambdaArn=event['ResourceProperties']['LambdaArn'] Bucket=event['ResourceProperties']['Bucket'] Prefix=event['ResourceProperties']['Prefix'] add_notification(LambdaArn, Bucket, Prefix) responseData={'Bucket':Bucket} logging.info("Sending response to custom resource") responseStatus = 'SUCCESS' except Exception as e: logging.error(e) responseStatus = 'FAILURE' responseData = {'Failure': 'Something bad happened.'} cfnresponse.send(event, context, responseStatus, responseData) def add_notification(LambdaArn, Bucket, Prefix): bucket_notification = s3.BucketNotification(Bucket) response = bucket_notification.put( NotificationConfiguration={ 'LambdaFunctionConfigurations': [ { 'LambdaFunctionArn': LambdaArn, 'Events': [ 's3:ObjectCreated:*' ], 'Filter': { 'Key': { 'FilterRules': [{ 'Name': 'prefix', 'Value': Prefix }] } } } ] } ) logging.info("Put request completed....") def delete_notification(Bucket): bucket_notification = s3.BucketNotification(Bucket) response = bucket_notification.put( NotificationConfiguration={} ) logging.info("Delete request completed....") Runtime: python3.7 Timeout: 50 CustomResourceLambdaFunctionRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: / Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:GetBucketNotification' - 's3:PutBucketNotification' Resource: !Sub 'arn:aws:s3:::${AppFlowS3Bucket}' - Effect: Allow Action: - 'logs:CreateLogGroup' - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: 'arn:aws:logs:*:*:*'