AWSTemplateFormatVersion: 2010-09-09 Description: Federated Segmentation using Amazon Pinpoint and Amazon Athena - Master Template Parameters: PinpointProjectId: Type: String Description: Amazon Pinpoint project to import results in to QueryResultsS3Path: Type: String Description: Amazon S3 full path to folder where query results / Pinpoint imports are to be stored Mappings: SourceCode: Runtime: Python: "python3.7" Resources: PerformQueryValidation: Type: AWS::Lambda::Function Properties: Handler: lambda_function.lambda_handler Role: !GetAtt PerformQueryValidationRole.Arn Runtime: !FindInMap ["SourceCode", "Runtime", "Python"] Timeout: 60 Environment: Variables: LOG_LEVEL: INFO # change to WARN, ERROR or DEBUG as needed Code: ZipFile: | import os import logging import traceback import re import json reg_mutate = "SELECT[^,]*(?:as|AS)\s*\"?Id\"?\s*(\s*,[^,]*(?:as|AS)\s*\"(?:Attributes\.|User\.UserAttributes\.|Metrics\.)\w*\"\s*)+\s*FROM.*" reg_direct = "SELECT[^,]*(?:as|AS)\s*\"?Id\"?\s*(\s*,[^,]*(?:as|AS)\s*\"(?:Attributes\.|User\.UserAttributes\.|Metrics\.)\w*\"\s*)*\s*FROM.*" def lambda_handler(event, context): try: global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) reg = None if event['Type'].lower() == 'direct': if 'SegmentName' not in event: raise Exception("SegmentName required when using Type 'direct'.") reg = reg_direct elif event['Type'].lower() == 'mutate': reg = reg_mutate else: raise Exception("Invalid Query Type parameter.") x = re.search(reg, event['Query']) if x == None: status = 'INVALID' status_message = 'Incoming query did not match required format: ' + reg else: status = 'VALID' status_message = 'Success' return { 'Query': event['Query'], 'SegmentId': event.get('SegmentId', 'none'), 'SegmentName': event.get('SegmentName', 'none'), 'Type': event['Type'], 'Status': status, 'StatusMessage': status_message } except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) result = { 'Status': 'ERROR', 'Message': '%s' % (error) } return result PerformQueryValidationRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" InitiateQueryLambda: Type: AWS::Lambda::Function Properties: Handler: lambda_function.lambda_handler Role: !GetAtt InitiateQueryLambdaRole.Arn Runtime: !FindInMap ["SourceCode", "Runtime", "Python"] Timeout: 60 Environment: Variables: LOG_LEVEL: INFO # change to WARN, ERROR or DEBUG as needed QUERY_RESULTS_PATH: !Ref QueryResultsS3Path Code: ZipFile: | mport boto3 import time import os import logging import traceback import json client = boto3.client('athena') def lambda_handler(event, context): try: global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) response = client.start_query_execution( QueryString=event['Query'], ResultConfiguration={ 'OutputLocation': os.environ.get('QUERY_RESULTS_PATH') } ) logging.info(response) return { 'Type': event['Type'], 'SegmentId': event['SegmentId'], 'SegmentName': event['SegmentName'], 'QueryExecutionId': response['QueryExecutionId'] } except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) result = { 'Status': 'ERROR', 'Message': '%s' % (error) } return result InitiateQueryLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "s3:GetBucketLocation" - "s3:GetObject" - "s3:ListBucket" - "s3:ListBucketMultipartUploads" - "s3:ListMultipartUploadParts" - "s3:AbortMultipartUpload" - "s3:CreateBucket" - "s3:PutObject" Resource: - !Sub - arn:aws:s3:::${Bucket} - { Bucket: !Select [2 , !Split [ "/", !Ref QueryResultsS3Path ] ] } - !Sub - arn:aws:s3:::${BucketPath} - { BucketPath: !Select [1, !Split [ "//" , !Ref QueryResultsS3Path ] ] } - !Sub - arn:aws:s3:::${BucketPath}* - { BucketPath: !Select [1, !Split [ "//" , !Ref QueryResultsS3Path ] ] } - Effect: "Allow" Action: - "athena:StartQueryExecution" - "athena:GetNamedQuery" Resource: !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/*" - Effect: "Allow" Action: - "athena:GetDataCatalog" Resource: - !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:datacatalog/*" - Effect: "Allow" Action: - "glue:GetDatabase" - "glue:GetDatabases" - "glue:GetTable" - "glue:GetTables" - "glue:GetPartition" - "glue:GetPartitions" Resource: - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/*/*" - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/*" - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog" QueryStatusLambda: Type: AWS::Lambda::Function Properties: Handler: lambda_function.lambda_handler Role: !GetAtt QueryStatusLambdaRole.Arn Runtime: !FindInMap ["SourceCode", "Runtime", "Python"] Timeout: 60 Environment: Variables: LOG_LEVEL: INFO # change to WARN, ERROR or DEBUG as needed Code: ZipFile: | import boto3 import time import os import logging import traceback import json client = boto3.client('athena') def lambda_handler(event, context): try: global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) response = client.get_query_execution( QueryExecutionId=event['QueryExecutionId'] ) logging.info(response) return { 'Type': event['Type'], 'SegmentId': event['SegmentId'], 'SegmentName': event['SegmentName'], 'QueryExecutionId': response['QueryExecution']['QueryExecutionId'], 'OutputLocation': response['QueryExecution']['ResultConfiguration']['OutputLocation'], 'Status': response['QueryExecution']['Status']['State'], 'StatusMessage': response['QueryExecution']['Status']['StateChangeReason'] if 'StateChangeReason' in response['QueryExecution']['Status'] else '' } except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) result = { 'Status': 'ERROR', 'Message': '%s' % (error) } return result QueryStatusLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "athena:GetQueryExecution" Resource: !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/*" ImportSegmentLambda: Type: AWS::Lambda::Function Properties: Handler: lambda_function.lambda_handler Role: !GetAtt ImportSegmentLambdaRole.Arn Runtime: !FindInMap ["SourceCode", "Runtime", "Python"] Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" # change to WARN, ERROR or DEBUG as needed APPLICATION_ID: !Ref PinpointProjectId ROLE_ARN: !GetAtt PinpointImportRole.Arn Code: ZipFile: | import boto3 import logging import traceback import time import os from urllib.parse import urlparse import json client = boto3.client('pinpoint') def lambda_handler(event, context): try: global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) s3url = event['OutputLocation'] request = { 'Format': 'CSV', 'RegisterEndpoints': True, 'RoleArn': os.environ.get('ROLE_ARN'), 'S3Url': s3url } if event['Type'].lower() == 'direct': request['DefineSegment'] = True request['SegmentName'] = event['SegmentName'] if 'SegmentId' in event and event['SegmentId'] != 'none': request['SegmentId'] = event['SegmentId'] elif event['Type'].lower() == 'mutate': request['DefineSegment'] = False response = client.create_import_job( ApplicationId=os.environ.get('APPLICATION_ID'), ImportJobRequest=request ) logging.info(response) return { 'ImportId': response['ImportJobResponse']['Id'] } except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) result = { 'Status': 'ERROR', 'Message': '%s' % (error) } return result ImportSegmentLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "mobiletargeting:GetSegmentVersion" - "mobiletargeting:GetSegment" - "mobiletargeting:GetSegments" - "mobiletargeting:GetSegmentVersions" - "mobiletargeting:CreateImportJob" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}*" - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}" - Effect: "Allow" Action: "iam:PassRole" Resource: - !GetAtt PinpointImportRole.Arn PinpointImportRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - pinpoint.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "s3:Get*" - "s3:List*" Resource: - !Sub - arn:aws:s3:::${Bucket} - { Bucket: !Select [2 , !Split [ "/", !Ref QueryResultsS3Path ] ] } - !Sub - arn:aws:s3:::${Bucket}* - { Bucket: !Select [2 , !Split [ "/", !Ref QueryResultsS3Path ] ] } ImportSegmentStatusLambda: Type: AWS::Lambda::Function Properties: Handler: lambda_function.lambda_handler Role: !GetAtt ImportSegmentStatusLambdaRole.Arn Runtime: !FindInMap ["SourceCode", "Runtime", "Python"] Timeout: 60 Environment: Variables: LOG_LEVEL: INFO # change to WARN, ERROR or DEBUG as needed APPLICATION_ID: !Ref PinpointProjectId Code: ZipFile: | import boto3 import logging import traceback import time import os from urllib.parse import urlparse import json client = boto3.client('pinpoint') def lambda_handler(event, context): try: global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) response = client.get_import_job( ApplicationId=os.environ.get('APPLICATION_ID'), JobId=event['ImportId'] ) logging.info(response) return { 'ImportId': response['ImportJobResponse']['Id'], 'Status': response['ImportJobResponse']['JobStatus'], 'ResponseFormatted': response['ImportJobResponse'] } except Exception as error: logging.error('lambda_handler error: %s' % (error)) logging.error('lambda_handler trace: %s' % traceback.format_exc()) result = { 'Status': 'ERROR', 'Message': '%s' % (error) } return result ImportSegmentStatusLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "mobiletargeting:GetImportJob" Resource: - !Sub "arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${PinpointProjectId}/jobs/import/*" QueryStateMachine: Type: AWS::StepFunctions::StateMachine Properties: RoleArn: !GetAtt QueryStateMachineRole.Arn DefinitionString: !Sub - |- { "StartAt": "PerformQueryValidation", "States": { "PerformQueryValidation": { "Type": "Task", "Resource": "${PerformQueryValidationArn}", "Next": "IsQueryValid" }, "IsQueryValid": { "Type": "Choice", "Default": "InitiateQuery", "Choices": [{ "Variable": "$.Status", "StringEquals": "ERROR", "Next": "DidError" },{ "Variable": "$.Status", "StringEquals": "INVALID", "Next": "QueryInvalid" }] }, "DidError": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Lambda Error", "Result.$": "$.Message" }, "Subject": "Federated Segmentation Error", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "failure" } } }, "End": true }, "QueryInvalid": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Query Invalid", "Result.$": "$" }, "Subject": "Federated Segmentation Query Invalid", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "failure" } } }, "End": true }, "InitiateQuery": { "Type": "Task", "Resource": "${InitiateQueryArn}", "Next": "QueryWait" }, "QueryWait" : { "Type": "Wait", "Seconds": 5, "Next": "QueryStatus" }, "QueryStatus": { "Type": "Task", "Resource": "${QueryStatusArn}", "Next": "IsQueryFinished" }, "IsQueryFinished":{ "Type": "Choice", "Default": "QueryWait", "Choices": [{ "Variable": "$.Status", "StringEquals": "ERROR", "Next": "DidError" },{ "Variable": "$.Status", "StringEquals": "FAILED", "Next": "QueryFailed" },{ "Variable": "$.Status", "StringEquals": "SUCCEEDED", "Next": "ImportSegment" }] }, "ImportSegment": { "Type": "Task", "Resource": "${ImportSegmentArn}", "Next": "ImportSegmentWait" }, "ImportSegmentWait": { "Type": "Wait", "Seconds": 5, "Next": "ImportSegmentStatus" }, "ImportSegmentStatus": { "Type": "Task", "Resource": "${ImportSegmentStatusArn}", "Next": "IsImportSegmentFinished" }, "IsImportSegmentFinished": { "Type": "Choice", "Default": "ImportSegmentWait", "Choices": [ { "Variable": "$.Status", "StringEquals": "ERROR", "Next": "DidError" },{ "Variable": "$.Status", "StringEquals": "FAILED", "Next": "ImportSegmentFailed" }, { "Variable": "$.Status", "StringEquals": "COMPLETED", "Next": "EmitSuccess" } ] }, "EmitSuccess": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "ResultPath": null, "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Segmentation Query Successful", "Result.$": "$" }, "Subject": "Federated Segmentation Query Successful", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "success" } } }, "End": true }, "QueryFailed": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Query Failed", "Result.$": "$" }, "Subject": "Federated Segmentation Query Failed", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "failure" } } }, "End": true }, "ImportSegmentFailed": { "Type": "Task", "Resource": "arn:aws:states:::sns:publish", "Parameters": { "TopicArn": "${SNSTopicArn}", "Message": { "Message": "Import Failed", "ImportResult.$": "$" }, "Subject": "Federeated Segmentation Amazon Pinpoint Import Failed", "MessageAttributes": { "notification_type": { "DataType": "String", "StringValue": "failure" } } }, "End": true } } } - {PerformQueryValidationArn: !GetAtt PerformQueryValidation.Arn, InitiateQueryArn: !GetAtt InitiateQueryLambda.Arn, QueryStatusArn: !GetAtt QueryStatusLambda.Arn, ImportSegmentArn: !GetAtt ImportSegmentLambda.Arn, ImportSegmentStatusArn: !GetAtt ImportSegmentStatusLambda.Arn, SNSTopicArn: !Ref QueryNotificationTopic} QueryStateMachineRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "states.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "lambda:InvokeFunction" Resource: - !GetAtt PerformQueryValidation.Arn - !GetAtt InitiateQueryLambda.Arn - !GetAtt QueryStatusLambda.Arn - !GetAtt ImportSegmentLambda.Arn - !GetAtt ImportSegmentStatusLambda.Arn - Effect: "Allow" Action: sns:Publish Resource: !Ref QueryNotificationTopic QueryNotificationTopic: Type: AWS::SNS::Topic Properties: DisplayName: 'PinpointSegmentationQuery' KmsMasterKeyId: alias/aws/sns Outputs: QueryStatusSNSTopicArn: Description: SNS Topic used to provide updates to status Value: !Ref QueryNotificationTopic InitateQueryLambdaRoleArn: Description: IAM Role ARN that will need access to the underlying data sources Value: !GetAtt InitiateQueryLambdaRole.Arn