AWSTemplateFormatVersion: 2010-09-09 Description: Amazon S3 Triggered Amazon Pinpoint Import Parameters: PinpointProjectId: Type: String Description: Amazon Pinpoint Project ID to import into FileDropS3Bucket: Type: String Description: Name of the existing Amazon S3 Bucket where new import files will be placed. FileDropS3Prefix: Type: String Description: Prefix of the Amazon S3 Bucket where new import files will be placed. ImportFileFormat: Type: String Default: 'CSV' AllowedValues: - CSV - JSON Description: Indicates format of import file. CSV or JSON Resources: ## State Machine Lambdas ImportSegmentLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt ImportSegmentLambdaRole.Arn Runtime: python3.7 Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" APPLICATION_ID: !Ref PinpointProjectId ROLE_ARN: !GetAtt PinpointImportRole.Arn FILE_FORMAT: !Ref ImportFileFormat Code: ZipFile: | import boto3 import logging import traceback import time import os from urllib.parse import urlparse client = boto3.client('pinpoint') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) s3url = event['S3URL'] filename = os.path.basename(s3url) response = client.create_import_job( ApplicationId=os.environ.get('APPLICATION_ID'), ImportJobRequest={ 'DefineSegment': True, 'Format': os.environ.get('FILE_FORMAT'), 'RoleArn': os.environ.get('ROLE_ARN'), 'S3Url': s3url, 'SegmentName': filename } ) logging.info(response) return { 'ImportId': response['ImportJobResponse']['Id'], 'SegmentId': response['ImportJobResponse']['Definition']['SegmentId'], 'ExternalId': response['ImportJobResponse']['Definition']['ExternalId'], } ImportSegmentLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "mobiletargeting:GetSegmentVersion" - "mobiletargeting:GetSegment" - "mobiletargeting:GetSegments" - "mobiletargeting:GetSegmentVersions" - "mobiletargeting:CreateImportJob" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}*" - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}" - Effect: "Allow" Action: "iam:PassRole" Resource: - !GetAtt PinpointImportRole.Arn PinpointImportRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - pinpoint.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "s3:Get*" - "s3:List*" Resource: - !Sub "arn:aws:s3:::${FileDropS3Bucket}*" - !Sub "arn:aws:s3:::${FileDropS3Bucket}" ImportSegmentStatusLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt ImportSegmentStatusLambdaRole.Arn Runtime: python3.7 Timeout: 60 Environment: Variables: APPLICATION_ID: !Ref PinpointProjectId Code: ZipFile: | import boto3 import time import os import logging import traceback import json client = boto3.client('pinpoint') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) response = client.get_import_job( ApplicationId=os.environ.get('APPLICATION_ID'), JobId=event['ImportId'] ) logging.info(response) return { 'ImportId': response['ImportJobResponse']['Id'], 'SegmentId': response['ImportJobResponse']['Definition']['SegmentId'], 'ExternalId': response['ImportJobResponse']['Definition']['ExternalId'], 'Status': response['ImportJobResponse']['JobStatus'], 'ResponseFormatted': response['ImportJobResponse'] } ImportSegmentStatusLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "mobiletargeting:GetImportJob" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}/jobs/import/*" ## State Machine PinpointImportNotificationTopic: Type: AWS::SNS::Topic Properties: DisplayName: 'PinpointImportNotifications' KmsMasterKeyId: alias/aws/sns ImportStateMachine: Type: AWS::StepFunctions::StateMachine Properties: RoleArn: !GetAtt ImportStateMachineRole.Arn DefinitionString: !Sub - |- { "StartAt": "SendStartNotification", "States": { "SendStartNotification": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Import Started", "Input.$": "$" }, "Subject": "Amazon Pinpoint Import Started", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "started" } } }, "ResultPath": null, "Next": "ImportSegment" }, "ImportSegment": { "Type": "Task", "Resource": "${ImportSegmentArn}", "Next": "ImportSegmentWait" }, "ImportSegmentWait": { "Type": "Wait", "Seconds": 5, "Next": "ImportSegmentStatus" }, "ImportSegmentStatus": { "Type": "Task", "Resource": "${ImportSegmentStatusArn}", "Next": "IsImportSegmentFinished" }, "IsImportSegmentFinished": { "Type": "Choice", "Default": "ImportSegmentWait", "Choices": [ { "Variable": "$.Status", "StringEquals": "FAILED", "Next": "ImportSegmentFailed" }, { "Variable": "$.Status", "StringEquals": "COMPLETED", "Next": "ImportSegmentSuccess" } ] }, "ImportSegmentSuccess": { "Type": "Pass", "Parameters": { "ImportOutput": { "ImportId.$": "$.ImportId", "SegmentId.$": "$.SegmentId", "ExternalId.$": "$.ExternalId", "Status.$": "$.Status", "ResponseFormatted.$": "$.ResponseFormatted", "ImportId.$": "$.ImportId" }, "ImportInput.$": "$$.Execution.Input.S3URL" }, "Next": "EmitSuccess" }, "EmitSuccess": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "ResultPath": null, "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Import Successful", "ImportResult.$": "$.ImportOutput.ResponseFormatted" }, "Subject": "Amazon Pinpoint Import Successful", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "success" } } }, "End": true }, "ImportSegmentFailed": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Import Failed", "ImportResult.$": "$.ResponseFormatted" }, "Subject": "Amazon Pinpoint Import Failed", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "failure" } } }, "End": true } } } - {ImportSegmentArn: !GetAtt ImportSegmentLambda.Arn, ImportSegmentStatusArn: !GetAtt ImportSegmentStatusLambda.Arn, SNSTopicArn: !Ref PinpointImportNotificationTopic} ImportStateMachineRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "states.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSGlueServiceRole" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "lambda:InvokeFunction" Resource: - !GetAtt ImportSegmentLambda.Arn - !GetAtt ImportSegmentStatusLambda.Arn - Effect: "Allow" Action: sns:Publish Resource: !Ref PinpointImportNotificationTopic ## S3 Trigger Lambda S3NotificationLambdaFunction: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt S3NotificationLambdaFunctionRole.Arn Runtime: python3.7 Timeout: 30 Environment: Variables: LOG_LEVEL: "INFO" STATE_MACHINE_ARN: !Ref ImportStateMachine Code: ZipFile: | import boto3 import time import json import os import logging import traceback client = boto3.client('stepfunctions') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) try: for record in event['Records']: s3url = 's3://' + record['s3']['bucket']['name'] + '/' + record['s3']['object']['key'] response = client.start_execution( stateMachineArn = os.environ.get('STATE_MACHINE_ARN'), name = 'import_run-' + time.strftime("%Y%m%d-%H%M%S"), input = json.dumps({ 'S3URL': s3url }) ) logging.info(response) return True except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) result = { 'statusCode': '500', 'body': {'message': 'error'} } return json.dumps(result) LambdaInvokePermission: Type: AWS::Lambda::Permission Properties: FunctionName: !GetAtt S3NotificationLambdaFunction.Arn Action: lambda:InvokeFunction Principal: s3.amazonaws.com SourceAccount: !Ref 'AWS::AccountId' SourceArn: !Sub 'arn:aws:s3:::${FileDropS3Bucket}' S3NotificationLambdaFunctionRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: / Policies: - PolicyName: root PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: states:StartExecution Resource: !Ref ImportStateMachine - Effect: Allow Action: - 'logs:CreateLogGroup' - 'logs:CreateLogStream' - 'logs:PutLogEvents' Resource: 'arn:aws:logs:*:*:*' ## Custom Lambda Resource to add a Lambda Trigger to # See: https://aws.amazon.com/premiumsupport/knowledge-center/cloudformation-s3-notification-lambda/ CustomResourceLambdaFunction: Type: 'AWS::Lambda::Function' Properties: Handler: index.lambda_handler Role: !GetAtt CustomResourceLambdaFunctionRole.Arn Runtime: python3.7 Timeout: 50 Code: ZipFile: | from __future__ import print_function import json import boto3 import cfnresponse SUCCESS = "SUCCESS" FAILED = "FAILED" print('Loading function') s3 = boto3.resource('s3') def lambda_handler(event, context): print("Received event: " + json.dumps(event, indent=2)) responseData={} try: if event['RequestType'] == 'Delete': print("Request Type:",event['RequestType']) Bucket=event['ResourceProperties']['Bucket'] delete_notification(Bucket) print("Sending response to custom resource after Delete") elif event['RequestType'] == 'Create' or event['RequestType'] == 'Update': print("Request Type:",event['RequestType']) LambdaArn=event['ResourceProperties']['LambdaArn'] Bucket=event['ResourceProperties']['Bucket'] Prefix=event['ResourceProperties']['Prefix'] add_notification(LambdaArn, Bucket, Prefix) responseData={'Bucket':Bucket} print("Sending response to custom resource") responseStatus = 'SUCCESS' except Exception as e: print('Failed to process:', e) responseStatus = 'FAILURE' responseData = {'Failure': 'Something bad happened.'} cfnresponse.send(event, context, responseStatus, responseData) def add_notification(LambdaArn, Bucket, Prefix): bucket_notification = s3.BucketNotification(Bucket) response = bucket_notification.put( NotificationConfiguration={ 'LambdaFunctionConfigurations': [ { 'LambdaFunctionArn': LambdaArn, 'Events': [ 's3:ObjectCreated:*' ], 'Filter': { 'Key': { 'FilterRules': [{ 'Name': 'prefix', 'Value': Prefix }] } } } ] } ) print("Put request completed....") def delete_notification(Bucket): bucket_notification = s3.BucketNotification(Bucket) response = bucket_notification.put( NotificationConfiguration={} ) print("Delete request completed....") CustomResourceLambdaFunctionRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - sts:AssumeRole Path: / Policies: - PolicyName: root PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - s3:GetBucketNotification - s3:PutBucketNotification Resource: - !Sub "arn:aws:s3:::${FileDropS3Bucket}" - !Sub "arn:aws:s3:::${FileDropS3Bucket}/*" - Effect: Allow Action: - logs:CreateLogGroup - logs:CreateLogStream - logs:PutDestination - logs:PutLogEvents Resource: - !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" LambdaTrigger: Type: 'Custom::LambdaTrigger' DependsOn: LambdaInvokePermission Properties: ServiceToken: !GetAtt CustomResourceLambdaFunction.Arn LambdaArn: !GetAtt S3NotificationLambdaFunction.Arn Bucket: !Ref FileDropS3Bucket Prefix: !Ref FileDropS3Prefix Outputs: ImportStatusSNSTopicArn: Description: SNS Topic used to provide updates to status Value: !Ref PinpointImportNotificationTopic