AWSTemplateFormatVersion: 2010-09-09 Description: Advanced Segmentation with S3 and Athena Parameters: PinpointProjectId: Type: String Description: Amazon Pinpoint Project ID if one already exists, blank to create one PinpointProjectName: Type: String Default: "My Pinpoint Project" Description: "If no PinpointProjectId provided, name to be used to create the Pinpoint project" S3DataLakeBucket: Type: String Description: The S3 Bucket Name of the existing Data Lake S3DataLakeGlueDatabaseName: Type: String Description: Name of the AWS Glue Database that is configured for your S3 Data Lake S3QueryOutputLocationBucketName: Type: String Description: S3 Bucket Name for the output location where queries can be written to temporarily by Athena. Ex my-bucket-name S3QueryOutputLocationBucketPrefix: Type: String Description: S3 Prefix/Path for the output location where queries can be written to temporarily by Athena. Ex path/to/a/folder Conditions: NeedsPinpointProjectId: !Equals - '' - !Ref PinpointProjectId Resources: PinpointApplication: Type: AWS::Pinpoint::App Condition: NeedsPinpointProjectId Properties: Name: !Ref PinpointProjectName QueryStartLambda: Type: AWS::Lambda::Function Properties: Handler: index.lambda_handler Role: !GetAtt QueryStartLambdaRole.Arn Runtime: "python3.7" Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" GLUE_DATABASE_NAME: !Ref S3DataLakeGlueDatabaseName S3_OUTPUT_LOCATION_BUCKET: !Ref S3QueryOutputLocationBucketName S3_OUTPUT_LOCATION_PREFIX: !Ref S3QueryOutputLocationBucketPrefix Code: ZipFile: | import boto3 import logging import os athena = boto3.client('athena') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) # CUSTOMER TODO - Provide Valid Data Lake Query for Athena query = ''' SELECT EndpointID FROM [ENTER S3 DATA LAKE DETAILS] WHERE [ENTER COMPLEX SEGMENTATION LOGIC] ''' response = athena.start_query_execution( QueryString=query, QueryExecutionContext={ 'Database': os.environ.get('GLUE_DATABASE_NAME') }, ResultConfiguration={ 'OutputLocation': 's3://' + os.environ.get('S3_OUTPUT_LOCATION_BUCKET') + '/' + os.environ.get('S3_OUTPUT_LOCATION_PREFIX') + '/' } ) logging.info(response) return { 'QueryExecutionId': response['QueryExecutionId'] } QueryStatusLambda: Type: AWS::Lambda::Function Properties: Role: !GetAtt QueryStatusLambdaRole.Arn Handler: index.lambda_handler Runtime: "python3.7" Timeout: 60 Environment: Variables: LOG_LEVEL: "INFO" Code: ZipFile: | import boto3 import logging import os athena = boto3.client('athena') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) response = athena.get_query_execution( QueryExecutionId=event['QueryExecutionId'] ) logging.info(response) return { 'QueryExecutionId': event['QueryExecutionId'], 'OutputLocation': response['QueryExecution']['ResultConfiguration']['OutputLocation'], 'Status': response['QueryExecution']['Status']['State'] } UpdateEndpointsLambda: Type: AWS::Lambda::Function Properties: Role: !GetAtt UpdateEndpointsLambdaRole.Arn Handler: index.lambda_handler Runtime: "python3.7" Timeout: 900 MemorySize: 2048 Environment: Variables: LOG_LEVEL: "INFO" S3_OUTPUT_LOCATION_BUCKET: !Ref S3QueryOutputLocationBucketName S3_OUTPUT_LOCATION_PREFIX: !Ref S3QueryOutputLocationBucketPrefix PINPOINT_PROJECT_ID: !If - NeedsPinpointProjectId - !Ref PinpointApplication - !Ref PinpointProjectId Code: ZipFile: | import boto3 import logging import os s3 = boto3.client('s3') pinpoint = boto3.client('pinpoint') def lambda_handler(event, context): global log_level log_level = str(os.environ.get('LOG_LEVEL')).upper() if log_level not in [ 'DEBUG', 'INFO', 'WARNING', 'ERROR', 'CRITICAL' ]: log_level = 'ERROR' logging.getLogger().setLevel(log_level) logging.info(event) # CUSTOMER TODO - assign a segment name for Pinpoint attributes segment_attribute_value = 'my_other_segment_name' obj = s3.get_object( Bucket=os.environ.get('S3_OUTPUT_LOCATION_BUCKET'), Key=event['OutputLocation'][6 + len(os.environ.get('S3_OUTPUT_LOCATION_BUCKET')):] ) # get rid of header row obj['Body']._raw_stream.readline() for line in obj['Body']._raw_stream: line = line.decode('utf-8').replace('\r', '').replace('\n', '').replace('"', '') logging.info('Reading Line: ' + line) # CUSTOMER TODO - Assumes that the first column of the results contains the EndpointId endpointId = line.split(",")[0] try: endpoint = pinpoint.get_endpoint( ApplicationId=os.environ['PINPOINT_PROJECT_ID'], EndpointId=endpointId ) segmentationValues = [segment_attribute_value] # If endpoint already has segmentation values, append them if 'Attributes' in endpoint['EndpointResponse'] and 'AdvancedSegmentTarget' in endpoint['EndpointResponse']['Attributes']: segmentationValues.extend(endpoint['EndpointResponse']['Attributes']['AdvancedSegmentTarget']) updateResponse = pinpoint.update_endpoint( ApplicationId=os.environ['PINPOINT_PROJECT_ID'], EndpointId=endpointId, EndpointRequest={ 'Attributes': { 'AdvancedSegmentTarget': segmentationValues } } ) logging.info(updateResponse) except pinpoint.exceptions.NotFoundException as error: logging.info('Did not find the endpoint id %s in your project', endpointId) except Exception as error: logging.error(error) DailyStateMachine: Type: AWS::StepFunctions::StateMachine Properties: RoleArn: !GetAtt DailyStateMachineRole.Arn DefinitionString: !Sub - |- { "StartAt": "QueryStart", "States": { "QueryStart": { "Type": "Task", "Resource": "${QueryStartArn}", "Next": "QueryWait" }, "QueryWait" : { "Type": "Wait", "Seconds": 5, "Next": "QueryStatus" }, "QueryStatus": { "Type": "Task", "Resource": "${QueryStatusArn}", "Next": "IsQueryFinished" }, "IsQueryFinished":{ "Type": "Choice", "Default": "QueryWait", "Choices": [{ "Variable": "$.Status", "StringEquals": "FAILED", "Next": "QueryFailed" },{ "Variable": "$.Status", "StringEquals": "SUCCEEDED", "Next": "UpdateEndpoints" }] }, "QueryFailed": { "Type": "Fail", "Cause": "Athena Query failed", "Error": "Athena Query failed" }, "UpdateEndpoints": { "Type": "Task", "Resource": "${UpdateEndpointsArn}", "End": true } } } - {QueryStartArn: !GetAtt QueryStartLambda.Arn, QueryStatusArn: !GetAtt QueryStatusLambda.Arn, UpdateEndpointsArn: !GetAtt UpdateEndpointsLambda.Arn} DailyCloudWatchEvent: Type: AWS::Events::Rule Properties: Description: "Run the DailyStateMachine" ScheduleExpression: "cron(0 2 * * ? *)" State: "ENABLED" RoleArn: !GetAtt DailyCloudWatchEventRole.Arn Targets: - Arn: !Ref DailyStateMachine Id: "DailyStateMachine" RoleArn: !GetAtt DailyCloudWatchEventRole.Arn DailyCloudWatchEventRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - "events.amazonaws.com" Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "states:StartExecution" Resource: !Ref DailyStateMachine DailyStateMachineRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "states.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "lambda:InvokeFunction" Resource: - !GetAtt QueryStartLambda.Arn - !GetAtt QueryStatusLambda.Arn - !GetAtt UpdateEndpointsLambda.Arn QueryStartLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "s3:GetBucketLocation" - "s3:GetObject" - "s3:ListBucket" - "s3:ListBucketMultipartUploads" - "s3:ListMultipartUploadParts" - "s3:AbortMultipartUpload" - "s3:CreateBucket" - "s3:PutObject" Resource: - !Sub "arn:aws:s3:::${S3DataLakeBucket}/*" - !Sub "arn:aws:s3:::${S3DataLakeBucket}" - !Sub "arn:aws:s3:::${S3QueryOutputLocationBucketName}/*" - !Sub "arn:aws:s3:::${S3QueryOutputLocationBucketName}" - Effect: "Allow" Action: - "athena:StartQueryExecution" - "athena:GetNamedQuery" Resource: !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/*" - Effect: "Allow" Action: - "glue:GetDatabase" - "glue:GetDatabases" - "glue:GetTable" - "glue:GetTables" - "glue:GetPartition" - "glue:GetPartitions" Resource: - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:table/*" - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:database/*" - !Sub "arn:aws:glue:${AWS::Region}:${AWS::AccountId}:catalog" QueryStatusLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "athena:GetQueryExecution" Resource: !Sub "arn:aws:athena:${AWS::Region}:${AWS::AccountId}:workgroup/*" UpdateEndpointsLambdaRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - lambda.amazonaws.com Action: - 'sts:AssumeRole' Path: "/" Policies: - PolicyName: "root" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: - "logs:CreateLogGroup" - "logs:CreateLogStream" - "logs:PutLogEvents" Resource: !Sub "arn:aws:logs:${AWS::Region}:${AWS::AccountId}:*" - Effect: "Allow" Action: - "s3:GetObject" - "s3:PutObject" Resource: - !Sub "arn:aws:s3:::${S3QueryOutputLocationBucketName}/*" - Effect: "Allow" Action: - "mobiletargeting:UpdateEndpoint" - "mobiletargeting:GetEndpoint" Resource: !Sub - 'arn:aws:mobiletargeting:${AWS::Region}:${AWS::AccountId}:apps/${ProjectId}/endpoints/*' - {ProjectId: !If [NeedsPinpointProjectId, !Ref PinpointApplication, !Ref PinpointProjectId] }