AWSTemplateFormatVersion: 2010-09-09 Description: Amazon S3 Triggered Amazon Pinpoint Import and Campaign creation (with optional phone validation) Metadata: AWS::CloudFormation::Interface: ParameterGroups: - Label: default: "Required Parameters" Parameters: - PinpointProjectId - FileDropS3Bucket - FileDropS3Prefix - Label: default: "Optional Parameters" Parameters: - AutoCreateCampaign - ValidatePhone - AssumeUS - CampaignDelay Parameters: PinpointProjectId: Type: String Description: Amazon Pinpoint Project ID to import into. Required! FileDropS3Bucket: Type: String Description: Name of the EXISTING Amazon S3 Bucket where new import files will be placed. Note that it has to be in the same region as you are running this template and the bucket should not have any existing notification configurations or they will be overwritten. FileDropS3Prefix: Type: String Default: import Description: Prefix (sub-folder name) of the Amazon S3 Bucket where new import files will be placed. Required! CampaignDelay: Type: Number Default: 15 Description: Number of minutes from the time of import to send the campaign. Allows for the last minute double check and/or pause as needed. Will be saved as CreateCampaign Lambda environment variable. AutoCreateCampaign: Type: String Default: False AllowedValues: - True - False Description: Choose True if you want to automatically create a campaign based on the imported file or False if you want to just import into the system. Default is False. Will be saved as ImportSegment Lambda environment variable. ValidatePhone: Type: String Default: True AllowedValues: - True - False Description: Choose TRUE if you want to use Pinpoint PhoneValidate functionality or FALSE if you want to import as-is. Default is TRUE. AssumeUS: Type: String Default: True AllowedValues: - True - False Description: Enter TRUE if you want to assume US (+1) phone number for any phone 10 digits long or FALSE if you want to import as-is. Default is TRUE. Resources: ## State Machine Lambdas ##Validate Lambda ImportSegmentLambdaValidate: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt ImportSegmentLambdaValidateRole.Arn Runtime: python3.8 Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" ASSUME_US: !Ref AssumeUS Code: ZipFile: | import csv import json import logging import os import boto3 def validatePinpoint(user_document_old): pinpointClient = boto3.client('pinpoint') if os.environ["ASSUME_US"].lower() != "false": assume_US = True else: assume_US = False user_document = json.loads(user_document_old) logging.info(user_document) for i in user_document: try: if assume_US == True and getNumeric(user_document[i]['Address']) == 10: logging.info("Assuming US number for " + user_document[i]['Address']) user_document[i]['Address'] = "+1"+user_document[i]['Address'] response = pinpointClient.phone_number_validate( NumberValidateRequest={ 'PhoneNumber': user_document[i]['Address'] } ) response_json = response['NumberValidateResponse'] except Exception as error: logging.error('validatePinpoint error: %s' % (error)) logging.error('validatePinpoint trace: %s' % traceback.format_exc()) response_json = {} response_json['CountryCodeIso2'] = 'XX' response_json['OriginalPhoneNumber'] = user_document[i]['Address'] logging.info(response_json) user_document[i]['CleansedPhoneNumberE164'] = getValue( 'CleansedPhoneNumberE164', response_json) user_document[i]['CountryCodeIso2'] = response_json['CountryCodeIso2'] user_document[i]['OriginalPhoneNumber'] = response_json['OriginalPhoneNumber'] user_document[i]['Timezone'] = getValue('Timezone', response_json) user_document[i]['ZipCode'] = getValue('ZipCode', response_json) user_document[i]['Carrier'] = getValue('Carrier', response_json) user_document[i]['PhoneTypeCode'] = getValue( 'PhoneTypeCode', response_json) user_document[i]['PhoneType'] = getValue('PhoneType', response_json) return user_document def getNumeric(str): import re temp = re.findall(r'\d+', str) res = ''.join(temp) return len(res) def getValue(name, field): return field[name] if name in field else "UNKNOWN" def load_csv_from_s3(s3_data_bucket, s3_data_key): bucket, key = s3_data_bucket, s3_data_key s3 = boto3.resource('s3') obj = s3.Object(bucket, key) response = obj.get() lines = response['Body'].read().decode('utf-8-sig').splitlines() segment_records = {} count = 0 for row in csv.DictReader(lines): segment_records[count] = row count += 1 segment_records = json.dumps(segment_records) return segment_records def lambda_handler(event, context): s3url = event['S3URL'] s3bucket = event['S3Bucket'] s3key = event['S3Key'] filename, extension = os.path.splitext(os.path.basename(s3url)) user_document = {} user_document = load_csv_from_s3(s3bucket, s3key) user_document_valid_phoneNumbers = validatePinpoint(user_document) return user_document_valid_phoneNumbers ImportSegmentLambdaValidateRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "rootValidate" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "s3:PutObject" - "s3:Get*" - "s3:List*" Resource: - !Sub "arn:aws:s3:::${FileDropS3Bucket}*" - !Sub "arn:aws:s3:::${FileDropS3Bucket}" - Effect: "Allow" Action: - "mobiletargeting:GetSegmentVersion" - "mobiletargeting:GetSegment" - "mobiletargeting:GetSegments" - "mobiletargeting:GetSegmentVersions" - "mobiletargeting:CreateImportJob" - "mobiletargeting:PhoneNumberValidate" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}*" - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}" - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:phone/number/validate" - Effect: "Allow" Action: "iam:PassRole" Resource: - !GetAtt PinpointImportRole.Arn ##end ## Save Validate ImportSegmentLambdaSave: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt ImportSegmentLambdaSaveRole.Arn Runtime: python3.8 Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" Code: ZipFile: | import csv import json import logging import os import boto3 def save_validate_phone_results(pinpointResponse, s3_data_bucket, original_filename): import datetime timestamp_date = str(datetime.datetime.now().strftime("%Y%m%d")) pinpointResultFile = 'tmp/'+original_filename+"_"+timestamp_date+'.csv' data_file = open("/"+pinpointResultFile, 'w') csv_writer = csv.writer(data_file) count = 0 for emp in pinpointResponse['records']: if count == 0: header = emp.keys() csv_writer.writerow(header) count += 1 csv_writer.writerow(emp.values()) data_file.close() s3_client = boto3.client('s3') s3_client.upload_file("/"+pinpointResultFile, s3_data_bucket, pinpointResultFile) return "s3://"+s3_data_bucket+"/"+pinpointResultFile def lambda_handler(event, context): response=save_validate_phone_results(event['segment_valid_structure'],event['s3bucket'],event['filename']) return response ImportSegmentLambdaSaveRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "rootSave" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "s3:PutObject" - "s3:Get*" - "s3:List*" Resource: - !Sub "arn:aws:s3:::${FileDropS3Bucket}*" - !Sub "arn:aws:s3:::${FileDropS3Bucket}" - Effect: "Allow" Action: - "mobiletargeting:GetSegmentVersion" - "mobiletargeting:GetSegment" - "mobiletargeting:GetSegments" - "mobiletargeting:GetSegmentVersions" - "mobiletargeting:CreateImportJob" - "mobiletargeting:PhoneNumberValidate" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}*" - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}" - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:phone/number/validate" - Effect: "Allow" Action: "iam:PassRole" Resource: - !GetAtt PinpointImportRole.Arn ##end ImportSegmentLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt ImportSegmentLambdaRole.Arn Runtime: python3.8 Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" APPLICATION_ID: !Ref PinpointProjectId ROLE_ARN: !GetAtt PinpointImportRole.Arn FILE_FORMAT: "CSV" CREATE_CAMPAIGN: !Ref AutoCreateCampaign VALIDATE_PHONE: !Ref ValidatePhone ValidateFunction: !GetAtt ImportSegmentLambdaValidate.Arn SaveValidateResult: !GetAtt ImportSegmentLambdaSave.Arn Code: ZipFile: | import json import logging import os import boto3 import typing def invokeLambdaFunction(*, functionName:str=None, payload:typing.Mapping[str, str]=None): if functionName == None: raise Exception('ERROR: functionName parameter required') payloadBytesArr = bytes(json.dumps(payload), encoding='utf8') client = boto3.client('lambda') response = client.invoke( FunctionName=functionName, InvocationType="RequestResponse", Payload=payloadBytesArr ) return json.load(response['Payload']) def load_csv_from_s3(s3_data_bucket, s3_data_key): bucket, key = s3_data_bucket, s3_data_key s3 = boto3.resource('s3') obj = s3.Object(bucket, key) response = obj.get() lines = response['Body'].read().decode('utf-8-sig').splitlines() segment_records = {} count = 0 for row in csv.DictReader(lines): segment_records[count] = row count += 1 segment_records = json.dumps(segment_records) return segment_records def map_to_segment(segment_local_file): import copy value_list = [] for new_key in range(0, len(segment_local_file)): valid_json = {} str_key = str(new_key) for key, value in segment_local_file[str_key].items(): if key == 'PhoneType': if value == 'INVALID' or segment_local_file[str_key]['CleansedPhoneNumberE164'] == "UNKNOWN": valid_json['Attributes.Endpoint'] = 'INVALID' valid_json['OptOut'] = 'ALL' valid_json['Address'] = segment_local_file[str_key]['OriginalPhoneNumber'] valid_json['Location.Country'] = "XX" else: valid_json['Attributes.Endpoint'] = 'VALID' valid_json['OptOut'] = segment_local_file[str_key]["OptOut"] if "OptOut" in segment_local_file[str_key] else "NONE" valid_json['Address'] = segment_local_file[str_key]['CleansedPhoneNumberE164'] valid_json['Location.Country'] = segment_local_file[str_key][ 'CountryCodeIso2'] if segment_local_file[str_key]['CountryCodeIso2'] != "UNKNOWN" else "XX" name = key if 'Attributes.' not in key and key not in ["Address", "ChannelType", "OptOut", "null"]: name = 'Attributes.'+key if key != "null": valid_json[name] = value value_json_new = copy.deepcopy(valid_json) value_list.append(value_json_new) return_json = {"records": value_list} return return_json def lambda_handler(event, context): if os.environ["VALIDATE_PHONE"].lower() != "false": validate_phone = True else: validate_phone = False s3url = event['S3URL'] s3bucket = event['S3Bucket'] filename, extension = os.path.splitext(os.path.basename(s3url)) if validate_phone: user_document_valid_phoneNumbers = invokeLambdaFunction( functionName=os.environ["ValidateFunction"], payload=event) segment_valid_structure = map_to_segment( user_document_valid_phoneNumbers) event_save = {"segment_valid_structure": segment_valid_structure, "s3bucket": s3bucket, "filename": filename} s3url = invokeLambdaFunction( functionName=os.environ["SaveValidateResult"], payload=event_save) client = boto3.client('pinpoint') response = client.create_import_job( ApplicationId=os.environ.get('APPLICATION_ID'), ImportJobRequest={ 'DefineSegment': True, 'Format': os.environ.get('FILE_FORMAT'), 'RoleArn': os.environ.get('ROLE_ARN'), 'S3Url': s3url, 'SegmentName': filename } ) return { 'ImportId': response['ImportJobResponse']['Id'], 'SegmentId': response['ImportJobResponse']['Definition']['SegmentId'], 'ExternalId': response['ImportJobResponse']['Definition']['ExternalId'], 'create_campaign': os.environ["CREATE_CAMPAIGN"].lower() } ImportSegmentLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "s3:PutObject" - "s3:Get*" - "s3:List*" Resource: - !Sub "arn:aws:s3:::${FileDropS3Bucket}*" - !Sub "arn:aws:s3:::${FileDropS3Bucket}" - Effect: "Allow" Action: - "mobiletargeting:GetSegmentVersion" - "mobiletargeting:GetSegment" - "mobiletargeting:GetSegments" - "mobiletargeting:GetSegmentVersions" - "mobiletargeting:CreateImportJob" - "mobiletargeting:PhoneNumberValidate" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}*" - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}" - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:phone/number/validate" - Effect: "Allow" Action: "iam:PassRole" Resource: - !GetAtt PinpointImportRole.Arn - Effect: "Allow" Action: - "lambda:InvokeFunction" Resource: - !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${ImportSegmentLambdaValidate}" - !Sub "arn:aws:lambda:${AWS::Region}:${AWS::AccountId}:function:${ImportSegmentLambdaSave}" PinpointImportRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - pinpoint.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "s3:Get*" - "s3:List*" Resource: - !Sub "arn:aws:s3:::${FileDropS3Bucket}*" - !Sub "arn:aws:s3:::${FileDropS3Bucket}" ImportSegmentStatusLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt ImportSegmentStatusLambdaRole.Arn Runtime: python3.8 Timeout: 60 Environment: Variables: APPLICATION_ID: !Ref PinpointProjectId Code: ZipFile: | import boto3 import time import os import logging import traceback import json client = boto3.client('pinpoint') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) create_campaign = event['create_campaign'] response = client.get_import_job( ApplicationId=os.environ.get('APPLICATION_ID'), JobId=event['ImportId'] ) logging.info(response) return { 'ImportId': response['ImportJobResponse']['Id'], 'SegmentId': response['ImportJobResponse']['Definition']['SegmentId'], 'ExternalId': response['ImportJobResponse']['Definition']['ExternalId'], 'Status': response['ImportJobResponse']['JobStatus'], 'ResponseFormatted': response['ImportJobResponse'], 'create_campaign' : create_campaign } ImportSegmentStatusLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "mobiletargeting:GetImportJob" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}/jobs/import/*" ##Create Campaign CreateCampaignLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt CreateCampaignLambdaRole.Arn Runtime: python3.8 Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" CAMPAIGN_DELAY: !Ref CampaignDelay Code: ZipFile: | import boto3 import datetime import json import os import logging import traceback client = boto3.client('pinpoint') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) delay = int(os.environ.get('CAMPAIGN_DELAY')) segment_id = event['SegmentId'] segment_name = event['ResponseFormatted']['Definition']['SegmentName'] application_id = event['ResponseFormatted']['ApplicationId'] import_info = event['ResponseFormatted'] StartTime = (datetime.datetime.now() + datetime.timedelta(minutes=delay)).isoformat() try: response = client.create_campaign( ApplicationId=application_id, WriteCampaignRequest={ "HoldoutPercent": 0, "IsPaused": False, "MessageConfiguration": { "SMSMessage": { "Body": "This is just a test...", "MessageType": "TRANSACTIONAL" } }, "Name": segment_name, "Schedule": { "IsLocalTime": False, "StartTime": StartTime, "Frequency": "ONCE", "Timezone": "UTC" }, "SegmentId": segment_id } ) logging.info(response) return { 'CampaignId': response['CampaignResponse']['Id'], 'SegmentId': response['CampaignResponse']['SegmentId'], 'CampaignState': response['CampaignResponse']['State']['CampaignStatus'], 'ImportInfo': import_info } except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) result = { 'statusCode': '500', 'body': {'message': 'error'} } return json.dumps(result) CreateCampaignLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "mobiletargeting:CreateCampaign" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}" ## State Machine PinpointImportNotificationTopic: Type: AWS::SNS::Topic Properties: DisplayName: 'PinpointImportNotifications' KmsMasterKeyId: alias/aws/sns ImportStateMachine: Type: AWS::StepFunctions::StateMachine Properties: RoleArn: !GetAtt ImportStateMachineRole.Arn DefinitionString: !Sub - |- { "StartAt": "SendStartNotification", "States": { "SendStartNotification": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Import Started", "Input.$": "$" }, "Subject": "Amazon Pinpoint Import Started", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "started" } } }, "ResultPath": null, "Next": "ImportSegment" }, "ImportSegment": { "Type": "Task", "Resource": "${ImportSegmentArn}", "Next": "ImportSegmentWait" }, "ImportSegmentWait": { "Type": "Wait", "Seconds": 5, "Next": "ImportSegmentStatus" }, "ImportSegmentStatus": { "Type": "Task", "Resource": "${ImportSegmentStatusArn}", "Next": "IsImportSegmentFinished" }, "IsImportSegmentFinished": { "Type": "Choice", "Default": "ImportSegmentWait", "Choices": [ { "Variable": "$.Status", "StringEquals": "FAILED", "Next": "ImportFailed" }, { "And": [ { "Variable": "$.Status", "StringEquals": "COMPLETED" }, { "Variable": "$.create_campaign", "StringEquals": "True" } ], "Next": "CreateCampaign" }, { "And": [ { "Variable": "$.Status", "StringEquals": "COMPLETED" }, { "Not": { "Variable": "$.create_campaign", "StringEquals": "True" } } ], "Next": "ImportSuccess" } ] }, "CreateCampaign": { "Type": "Task", "Resource": "${CreateCampaignArn}", "Next": "IsCreateCampaignFinished" }, "IsCreateCampaignFinished": { "Type": "Choice", "Default": "ImportFailed", "Choices": [ { "Variable": "$.CampaignState", "StringEquals": "SCHEDULED", "Next": "ImportSuccess" }, { "Variable": "$.CampaignState", "StringEquals": "INVALID", "Next": "ImportFailed" } ] }, "ImportSuccess": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Import&Campaign Schedule Successful", "Result.$": "$" }, "Subject": "Amazon Pinpoint Import&Campaign Schedule Successful", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "success" } } }, "ResultPath": null, "End": true }, "ImportFailed": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Import or Campaign Failed", "Result.$": "$" }, "Subject": "Amazon Pinpoint Import or Campaign Failed", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "failure" } } }, "ResultPath": null, "End": true } } } - {ImportSegmentArn: !GetAtt ImportSegmentLambda.Arn, ImportSegmentStatusArn: !GetAtt ImportSegmentStatusLambda.Arn, CreateCampaignArn: !GetAtt CreateCampaignLambda.Arn, SNSTopicArn: !Ref PinpointImportNotificationTopic} ImportStateMachineRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "states.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "lambda:InvokeFunction" Resource: - !GetAtt ImportSegmentLambda.Arn - !GetAtt ImportSegmentStatusLambda.Arn - !GetAtt CreateCampaignLambda.Arn - Effect: "Allow" Action: sns:Publish Resource: !Ref PinpointImportNotificationTopic ## S3 Trigger Lambda S3NotificationLambdaFunction: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt S3NotificationLambdaFunctionRole.Arn Runtime: python3.8 Timeout: 30 Environment: Variables: LOG_LEVEL: "INFO" STATE_MACHINE_ARN: !Ref ImportStateMachine Code: ZipFile: | import boto3 import time import json import os import logging import traceback client = boto3.client('stepfunctions') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) try: for record in event['Records']: s3url = 's3://' + \ record['s3']['bucket']['name'] + \ '/' + record['s3']['object']['key'] response = client.start_execution( stateMachineArn=os.environ.get('STATE_MACHINE_ARN'), name='import_run-' + time.strftime("%Y%m%d-%H%M%S"), input=json.dumps({ 'S3URL': s3url, 'S3Bucket': record['s3']['bucket']['name'], 'S3Key': record['s3']['object']['key'] }) ) logging.info(response) return True except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) result = { 'statusCode': '500', 'body': {'message': 'error'} } return json.dumps(result) LambdaInvokePermission: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt S3NotificationLambdaFunction.Arn Action: lambda:InvokeFunction Principal: s3.amazonaws.com SourceAccount: !Ref 'AWS::AccountId' SourceArn: !Sub 'arn:aws:s3:::${FileDropS3Bucket}' S3NotificationLambdaFunctionRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: / Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: states:StartExecution Resource: !Ref ImportStateMachine - Effect: Allow Action: - 'logs:CreateLogGroup' - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: 'arn:aws:logs:*:*:*' ## Custom Lambda Resource to add a Lambda Trigger to # See: https://aws.amazon.com/premiumsupport/knowledge-center/cloudformation-s3-notification-lambda/ CustomResourceLambdaFunction: Type: 'AWS::Lambda::Function' Properties: Handler: index.lambda_handler Role: !GetAtt CustomResourceLambdaFunctionRole.Arn Runtime: python3.8 Timeout: 50 Code: ZipFile: | from __future__ import print_function import json import boto3 import cfnresponse SUCCESS = "SUCCESS" FAILED = "FAILED" print('Loading function') s3 = boto3.resource('s3') def lambda_handler(event, context): print("Received event: " + json.dumps(event, indent=2)) responseData={} try: if event['RequestType'] == 'Delete': print("Request Type:",event['RequestType']) Bucket=event['ResourceProperties']['Bucket'] delete_notification(Bucket) print("Sending response to custom resource after Delete") elif event['RequestType'] == 'Create' or event['RequestType'] == 'Update': print("Request Type:",event['RequestType']) LambdaArn=event['ResourceProperties']['LambdaArn'] Bucket=event['ResourceProperties']['Bucket'] Prefix=event['ResourceProperties']['Prefix'] add_notification(LambdaArn, Bucket, Prefix) responseData={'Bucket':Bucket} print("Sending response to custom resource") responseStatus = 'SUCCESS' except Exception as e: print('Failed to process:', e) responseStatus = 'FAILURE' responseData = {'Failure': 'Something bad happened.'} cfnresponse.send(event, context, responseStatus, responseData) def add_notification(LambdaArn, Bucket, Prefix): bucket_notification = s3.BucketNotification(Bucket) response = bucket_notification.put( NotificationConfiguration={ 'LambdaFunctionConfigurations': [ { 'LambdaFunctionArn': LambdaArn, 'Events': [ 's3:ObjectCreated:*' ], 'Filter': { 'Key': { 'FilterRules': [{ 'Name': 'prefix', 'Value': Prefix }] } } } ] } ) print("Put request completed....") def delete_notification(Bucket): bucket_notification = s3.BucketNotification(Bucket) response = bucket_notification.put( NotificationConfiguration={} ) print("Delete request completed....") CustomResourceLambdaFunctionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:GetBucketNotification - s3:PutBucketNotification Resource: - !Sub "arn:aws:s3:::${FileDropS3Bucket}" - !Sub "arn:aws:s3:::${FileDropS3Bucket}/*" - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutDestination - logs:PutLogEvents Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" LambdaTrigger: Type: 'Custom::LambdaTrigger' DependsOn: LambdaInvokePermission Properties: ServiceToken: !GetAtt CustomResourceLambdaFunction.Arn LambdaArn: !GetAtt S3NotificationLambdaFunction.Arn Bucket: !Ref FileDropS3Bucket Prefix: !Ref FileDropS3Prefix Outputs: ImportStatusSNSTopic: Description: SNS Topic used to provide updates to status Value: !Ref PinpointImportNotificationTopic