AWSTemplateFormatVersion: 2010-09-09 Description: Personalize ingest solution Transform: AWS::Serverless-2016-10-31 Parameters: CustomerInteractionShardCount: Type: Number Default: 1 Description: Number of Kinesis Shards to ingest the customer interactions. ViewedItemHistoryDays: Type: Number Default: 30 Description: Number of Days the user interactions should be stored for the recently visited functionality. TrackingId: Type: String Description: Tracking ID of Personalize Event tracker that should be updated based on user interactions. CampaignArn: Type: String Description: ARN of the Personalize Campaign that should be used to get recommendations. CreateRecentlyVisitedItems: Type: String Description: Set to 'true' if resources should be created to track recently visited items for each customer and serve them via an API. Default: true StoreUserInteractionInDatalake: Type: String Description: Set to 'true' if resources should be created to store user interactions in an S3 bucket for datalake use. Default: true Conditions: CreateRecentlyVisitedResources: !Equals [ !Ref CreateRecentlyVisitedItems, true ] CreateDatalakeResources: !Equals [ !Ref StoreUserInteractionInDatalake, true ] Resources: CustomerInteractionStream: Type: AWS::Kinesis::Stream Properties: RetentionPeriodHours: 168 ShardCount: Ref: CustomerInteractionShardCount FirehoseS3DeliveryStream: Condition: CreateDatalakeResources DependsOn: - deliveryPolicy - CustomerInteractionStream - kinesisSourcePolicy - kinesisSourceRole - deliveryRole Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamType: KinesisStreamAsSource KinesisStreamSourceConfiguration: KinesisStreamARN: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" RoleARN: Fn::GetAtt: - "kinesisSourceRole" - "Arn" ExtendedS3DestinationConfiguration: BucketARN: !Join - '' - - 'arn:aws:s3:::' - !Ref rawDataBucket BufferingHints: IntervalInSeconds: '60' SizeInMBs: '50' CompressionFormat: GZIP Prefix: interactions/ RoleARN: Fn::GetAtt: - "deliveryRole" - "Arn" rawDataBucket: Condition: CreateDatalakeResources Type: AWS::S3::Bucket Properties: VersioningConfiguration: Status: Enabled kinesisSourceRole: Condition: CreateDatalakeResources Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: firehose.amazonaws.com Action: 'sts:AssumeRole' Condition: StringEquals: 'sts:ExternalId': !Ref 'AWS::AccountId' kinesisSourcePolicy: Condition: CreateDatalakeResources Type: AWS::IAM::Policy Properties: PolicyName: firehose_delivery_policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 'kinesis:Get*' - 'kinesis:DescribeStream' Resource: - Fn::GetAtt: - "CustomerInteractionStream" - "Arn" Roles: - !Ref kinesisSourceRole deliveryRole: Condition: CreateDatalakeResources Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: firehose.amazonaws.com Action: 'sts:AssumeRole' Condition: StringEquals: 'sts:ExternalId': !Ref 'AWS::AccountId' deliveryPolicy: Condition: CreateDatalakeResources Type: AWS::IAM::Policy Properties: PolicyName: firehose_delivery_policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:AbortMultipartUpload' - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' - 's3:ListBucketMultipartUploads' - 's3:PutObject' Resource: - !Join - '' - - 'arn:aws:s3:::' - !Ref rawDataBucket - !Join - '' - - 'arn:aws:s3:::' - !Ref rawDataBucket - '*' Roles: - !Ref deliveryRole UpdateHistoryRole: Type: "AWS::IAM::Role" Condition: CreateRecentlyVisitedResources Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" Policies: - PolicyName: "UpdateHistoryTable" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "dynamodb:UpdateItem" Resource: Fn::GetAtt: - "historyTable" - "Arn" - Effect: "Allow" Action: "kinesis:GetRecords" Resource: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" - Effect: "Allow" Action: "kinesis:GetShardIterator" Resource: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" - Effect: "Allow" Action: "kinesis:DescribeStream" Resource: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" - Effect: "Allow" Action: "kinesis:ListStreams" Resource: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" UpdateHistoryFunction: Type: AWS::Serverless::Function Condition: CreateRecentlyVisitedResources Properties: FunctionName: UpdateUserHistory Handler: lambda.lambda_handler Role: Fn::GetAtt: - "UpdateHistoryRole" - "Arn" CodeUri: lambda-update-history/ Environment: Variables: TABLE_NAME: Ref: historyTable ITEM_TTL_DAYS: Ref: ViewedItemHistoryDays Runtime: python3.7 Timeout: 300 PutPersonalizeEventsRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" Policies: - PolicyName: "PutPersonalizeEvents" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "personalize:PutEvents" Resource: "*" - Effect: "Allow" Action: "kinesis:GetRecords" Resource: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" - Effect: "Allow" Action: "kinesis:GetShardIterator" Resource: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" - Effect: "Allow" Action: "kinesis:DescribeStream" Resource: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" - Effect: "Allow" Action: "kinesis:ListStreams" Resource: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" PutPersonalizeEventsFunction: Type: AWS::Serverless::Function Properties: FunctionName: PutPersonalizeEvents Handler: lambda.lambda_handler Role: Fn::GetAtt: - "PutPersonalizeEventsRole" - "Arn" CodeUri: lambda-personalize-events/ Environment: Variables: TRACKING_ID: Ref: TrackingId Runtime: python3.7 Timeout: 300 historyTable: Type: AWS::DynamoDB::Table Condition: CreateRecentlyVisitedResources Properties: AttributeDefinitions: - AttributeName: "UserId" AttributeType: "S" - AttributeName: "TypeTime" AttributeType: "S" KeySchema: - AttributeName: "UserId" KeyType: "HASH" - AttributeName: "TypeTime" KeyType: "RANGE" BillingMode: PAY_PER_REQUEST TimeToLiveSpecification: AttributeName: "TTLTime" Enabled: true CustomerHistoryEventSource: Type: AWS::Lambda::EventSourceMapping Condition: CreateRecentlyVisitedResources Properties: EventSourceArn: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" FunctionName: Fn::GetAtt: - "UpdateHistoryFunction" - "Arn" StartingPosition: "TRIM_HORIZON" MaximumRetryAttempts: 2 PersonalizeEventSource: Condition: CreateDatalakeResources Type: AWS::Lambda::EventSourceMapping Properties: EventSourceArn: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" FunctionName: Fn::GetAtt: - "PutPersonalizeEventsFunction" - "Arn" StartingPosition: "TRIM_HORIZON" MaximumRetryAttempts: 2 GetHistoryFunction: Type: AWS::Serverless::Function Condition: CreateRecentlyVisitedResources Properties: FunctionName: GetUserHistory Handler: lambda.lambda_handler Role: Fn::GetAtt: - "GetHistoryRole" - "Arn" CodeUri: lambda-history-api/ Events: HttpPost: Type: HttpApi Properties: Path: '/history/{userId}' Method: get Environment: Variables: TABLE_NAME: Ref: historyTable Runtime: python3.7 Timeout: 300 GetHistoryRole: Type: "AWS::IAM::Role" Condition: CreateRecentlyVisitedResources Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" Policies: - PolicyName: "UpdateHistoryTable" PolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Action: "dynamodb:Query" Resource: Fn::GetAtt: - "historyTable" - "Arn" CustomerInteractionApiGatewayRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Principal: Service: - apigateway.amazonaws.com Action: - sts:AssumeRole Policies: - PolicyName: 'PersonalizeIngestApiGatewayRolePolicy' PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Sid: LogGroupsAccess Action: - logs:DescribeLogGroups Resource: - '*' - Effect: Allow Sid: CloudWatchAccess Action: - cloudwatch:* Resource: - '*' - Effect: Allow Sid: KinesisStreamsAccess Action: - kinesis:* Resource: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" CustomerInteractionApiGatewayStage: Type: AWS::ApiGateway::Deployment Properties: RestApiId: !Ref 'CustomerInteractionAPI' StageName: 'prod' CustomerInteractionAPI: Type: AWS::ApiGateway::RestApi Properties: Name: 'PersonalizeIngestAPI' Description: API used for requests FailOnWarnings: false Body: openapi: 3.0.0 components: schemas: Interaction: title: Interaction type: object properties: userId: required: true type: string itemId: required: true type: string type: type: string default: impression info: title: KinesisProxy version: '2016-03-31T18:25:32Z' paths: /interaction: post: requestBody: required: true content: application/json: schema: $ref: '#/components/schemas/Interaction' responses: '200': description: 200 response x-amazon-apigateway-integration: credentials: !GetAtt 'CustomerInteractionApiGatewayRole.Arn' httpMethod: POST requestParameters: integration.request.header.Content-Type: '''application/x-amz-json-1.1''' requestTemplates: application/json: !Sub | #set($allParams = $input.path('$')) { "StreamName": "${CustomerInteractionStream}", "Data": "$util.base64Encode($input.json('$'))", "PartitionKey": $input.json('userId') } responses: default: statusCode: '200' type: aws uri: !Sub 'arn:aws:apigateway:${AWS::Region}:kinesis:action/PutRecord' RecommendationRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: "Allow" Principal: Service: - "lambda.amazonaws.com" Action: - "sts:AssumeRole" Path: "/" ManagedPolicyArns: - "arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole" Policies: - PolicyName: "UpdateHistoryTable" PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Sid: PersonalizeAccess Action: - personalize:GetRecommendations Resource: Ref: CampaignArn GetRecommendationFunction: Type: AWS::Serverless::Function Properties: FunctionName: GetRecommendation Handler: lambda.lambda_handler Role: Fn::GetAtt: - "RecommendationRole" - "Arn" CodeUri: lambda-recommendation-api/ Events: HttpGet: Type: HttpApi Properties: Path: '/recommendation/{userId}' Method: get Environment: Variables: PERSONALIZE_CAMPAIGN_ARN: Ref: CampaignArn Runtime: python3.7 Timeout: 300 Outputs: RawDataBucket: Condition: CreateDatalakeResources Description: Bucket that stores the raw user interaction data. Value: !Join ['', ['s3://', Ref: rawDataBucket, /]] CustomerInteractionStreamARN: Description: ARN of the stream to push the customer interactions to. Value: Fn::GetAtt: - "CustomerInteractionStream" - "Arn" CustomerInteractionStreamName: Description: Name of the stream to push the customer interactions to. Value: Ref: CustomerInteractionStream PersonalizationApiURL: Description: "API endpoint URL for Prod environment" Value: !Sub "https://${ServerlessHttpApi}.execute-api.${AWS::Region}.amazonaws.com/" InteractionApiURL: Description: "API endpoint URL for Prod environment" Value: !Sub "https://${CustomerInteractionAPI}.execute-api.${AWS::Region}.amazonaws.com/prod/interaction/"