AWSTemplateFormatVersion: '2010-09-09' Transform: AWS::Serverless-2016-10-31 Description: > personalize-streaming-events Globals: Function: Timeout: 3 Parameters: EventTrackerIdParam: Type: String Description: Please enter your Amazon Personalize Event Tracker ID Default: 9bec3974-v6bd-5fcf CampaignARNParam: Type: String Description: Please enter your Amazon Personalize Personalize CampaignARN where you would like to get recommendations from Default: campaignARNDefault S3BucketParam: Type: String Description: Please enter your S3 Bucket where you would like to persist your interactions data from Kinesis Default: s3BucketDefault KinesisBatchSize: Type: Number Description: Please enter your preferred batch size procesing data from Kinesis Default: 50 ShardCount: Type: Number Description: Please enter your preferred Amazon Kinesis Shard Count Default: 1 Resources: # # DDB History table # DynamoTable: # Type: "AWS::DynamoDB::Table" # Properties: # AttributeDefinitions: # - # AttributeName: "userId" # AttributeType: "S" # - # AttributeName: "timeStamp" # AttributeType: "S" # KeySchema: # - # AttributeName: "userId" # KeyType: "HASH" # - # AttributeName: "timeStamp" # KeyType: "RANGE" # ProvisionedThroughput: # ReadCapacityUnits: "5" # WriteCapacityUnits: "5" # TableName: "se-personalize-history-tracking-table" # SSM Parameters # TableName: # Type: AWS::SSM::Parameter # Properties: # Name: se-personalize-streaming-events-table # Type: String # Value: !Ref DynamoTable # Description: SSM parameter containing the DynamoDB table name EventTrackerId: Type: AWS::SSM::Parameter Properties: Name: se-personalize-event-tracker-id Type: String Value: !Ref EventTrackerIdParam Description: SSM parameter containing the EventTrackerId of your Amazon Personalize deployment CampaignARN: Type: AWS::SSM::Parameter Properties: Name: se-personalize-campaign-arn Type: String Value: !Ref CampaignARNParam Description: SSM parameter containing the CampaignARN of your Amazon Personalize deployment S3BucketName: Type: AWS::SSM::Parameter Properties: Name: se-personalize-s3-bucket-name Type: String Value: !Ref S3BucketParam Description: SSM parameter containing the S3 where your interactions data will be persisted # Lambda functions GetRecommendations: DependsOn: - LambdaPolicies Type: AWS::Serverless::Function Properties: CodeUri: lambdas/getRecommendations Handler: getRecommendations.handler Runtime: python3.7 Role: !GetAtt [LambdaRole, Arn] MemorySize: 1024 Environment: Variables: CAMPAIGN_ARN: !GetAtt [CampaignARN, Value] PutEvents: DependsOn: - LambdaPolicies Type: AWS::Serverless::Function Properties: CodeUri: lambdas/putevents Handler: putevents.handler Runtime: nodejs12.x Role: !GetAtt [LambdaRole, Arn] Events: Stream: Type: Kinesis Properties: Stream: !GetAtt KinesisStreamConsumer.ConsumerARN StartingPosition: LATEST BatchSize: !Ref KinesisBatchSize Environment: Variables: TRACKING_ID: !GetAtt [EventTrackerId, Value] # Kinesis Stream KinesisStream: Type: "AWS::Kinesis::Stream" Properties: ShardCount: !Ref ShardCount KinesisStreamConsumer: Type: "AWS::Kinesis::StreamConsumer" Properties: StreamARN: !GetAtt KinesisStream.Arn ConsumerName: "LambdaConsumer" # Kinesis Firehose Deliverystream: DependsOn: - KinesisDeliveryPolicy Type: AWS::KinesisFirehose::DeliveryStream Properties: DeliveryStreamName: "se-kinesis-delivery-stream" DeliveryStreamType: KinesisStreamAsSource KinesisStreamSourceConfiguration: KinesisStreamARN: !GetAtt KinesisStream.Arn RoleARN: !GetAtt KinesisDeliveryRole.Arn ExtendedS3DestinationConfiguration: BucketARN: !Join - '' - - 'arn:aws:s3:::' - !GetAtt [S3BucketName, Value] BufferingHints: IntervalInSeconds: 60 SizeInMBs: 50 CompressionFormat: UNCOMPRESSED Prefix: personalize-events/ RoleARN: !GetAtt KinesisDeliveryRole.Arn # APIGW ApiGatewayEndpoint: DependsOn: - GetRecommendations - PutEvents Type: "AWS::ApiGateway::RestApi" Properties: Name: "se-personalize-endpoint" Description: "Amazon Personalize Streaming Events Enpoint" #Create the response model for you api ApiGatewayModel: Type: AWS::ApiGateway::Model DependsOn: - GetRecommendations - PutEvents Properties: ContentType: 'application/json' RestApiId: !Ref ApiGatewayEndpoint Schema: {} #API KEY APIKey: DependsOn: - apiGatewayDeployment - APIStage Type: "AWS::ApiGateway::ApiKey" Properties: Description: "This is an API key" Enabled: true StageKeys: - RestApiId: !Ref ApiGatewayEndpoint StageName: "dev2" usagePlan: DependsOn: - APIKey - APIStage Type: 'AWS::ApiGateway::UsagePlan' Properties: ApiStages: - ApiId: !Ref ApiGatewayEndpoint Stage: !Ref APIStage Description: Customer ABC's usage plan UsagePlanName: 'se-usage-plan' usagePlanKey: DependsOn: - APIKey - APIStage Type: 'AWS::ApiGateway::UsagePlanKey' Properties: KeyId: !Ref APIKey KeyType: API_KEY UsagePlanId: !Ref usagePlan APIStage: Type: AWS::ApiGateway::Stage Properties: StageName: "dev2" Description: Prod Stage RestApiId: !Ref "ApiGatewayEndpoint" DeploymentId: !Ref apiGatewayDeployment apiGatewayDeployment: Type: "AWS::ApiGateway::Deployment" DependsOn: - GetRecommendations - PutEvents - "LambdaGetRecommendationsMethod" Properties: RestApiId: !Ref "ApiGatewayEndpoint" # Create /recommendations resource RecommendationsResource: Type: 'AWS::ApiGateway::Resource' DependsOn: - GetRecommendations - PutEvents Properties: RestApiId: !Ref ApiGatewayEndpoint ParentId: !GetAtt - ApiGatewayEndpoint - RootResourceId PathPart: recommendations # Integrate with your Get Recommendations Lambda LambdaGetRecommendationsMethod: Type: "AWS::ApiGateway::Method" DependsOn: - GetRecommendations - PutEvents Properties: ApiKeyRequired: "true" RestApiId: !Ref ApiGatewayEndpoint ResourceId: !Ref RecommendationsResource HttpMethod: "POST" AuthorizationType: "NONE" Integration: IntegrationHttpMethod: "POST" Type: "AWS_PROXY" Uri: !Sub - "arn:aws:apigateway:${AWS::Region}:lambda:path/2015-03-31/functions/${lambdaArn}/invocations" - lambdaArn: !GetAtt "GetRecommendations.Arn" # Activate CORS RecommendationsCORESOptionsMethod: Type: AWS::ApiGateway::Method Properties: AuthorizationType: "NONE" RestApiId: !Ref ApiGatewayEndpoint ResourceId: !Ref RecommendationsResource HttpMethod: "OPTIONS" Integration: Type: "MOCK" IntegrationResponses: - StatusCode: 200 ResponseParameters: method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" method.response.header.Access-Control-Allow-Methods: "'POST,OPTIONS'" method.response.header.Access-Control-Allow-Origin: "'*'" ResponseTemplates: application/json: '' PassthroughBehavior: "WHEN_NO_MATCH" RequestTemplates: application/json: '{"statusCode": 200}' MethodResponses: - StatusCode: '200' ResponseModels: 'application/json': 'Empty' ResponseParameters: 'method.response.header.Access-Control-Allow-Headers': 'false' 'method.response.header.Access-Control-Allow-Methods': 'false' 'method.response.header.Access-Control-Allow-Origin': 'false' # Deploy your API endpoint # Create the invoke permissions recommendationsLambdaApiGatewayInvokeGET: Type: "AWS::Lambda::Permission" DependsOn: - GetRecommendations - PutEvents - apiGatewayDeployment Properties: Action: "lambda:InvokeFunction" FunctionName: !GetAtt "GetRecommendations.Arn" Principal: "apigateway.amazonaws.com" SourceArn: !Sub "arn:aws:execute-api:${AWS::Region}:${AWS::AccountId}:${ApiGatewayEndpoint}/*/POST/recommendations" # Create /events resource EventsResource: Type: 'AWS::ApiGateway::Resource' Properties: RestApiId: !Ref ApiGatewayEndpoint ParentId: !GetAtt - ApiGatewayEndpoint - RootResourceId PathPart: events KinesisPutEventsMethod: Type: "AWS::ApiGateway::Method" Properties: ApiKeyRequired: "true" RestApiId: !Ref ApiGatewayEndpoint ResourceId: !Ref EventsResource HttpMethod: "POST" AuthorizationType: "NONE" Integration: IntegrationHttpMethod: "POST" Credentials: !GetAtt APIGatewayKinesisRole.Arn #role for the API to actually invoke the firehose Type: "AWS" Uri: Fn::Join: - "" - - "arn:aws:apigateway:" - Ref: AWS::Region - ":kinesis:action/PutRecord" RequestParameters: "integration.request.header.Content-Type" : "'application/x-amz-json-1.1'" RequestTemplates: "application/json": !Sub - "#set($inputRoot = $input.path('$'))\n {\n \"StreamName\": \"${streamName}\",\n \"Data\": \"$util.base64Encode(\"$input.json('$')\")\",\n \"PartitionKey\" : $input.json('$.SessionId')}" - streamName: !Ref KinesisStream PassthroughBehavior: "WHEN_NO_TEMPLATES" IntegrationResponses: - StatusCode: 200 MethodResponses: - StatusCode: 200 ResponseModels: {"application/json": "Empty"} EventsCORESOptionsMethod: Type: AWS::ApiGateway::Method Properties: AuthorizationType: "NONE" RestApiId: !Ref ApiGatewayEndpoint ResourceId: !Ref EventsResource HttpMethod: "OPTIONS" Integration: Type: "MOCK" IntegrationResponses: - StatusCode: 200 ResponseParameters: method.response.header.Access-Control-Allow-Headers: "'Content-Type,X-Amz-Date,Authorization,X-Api-Key,X-Amz-Security-Token'" method.response.header.Access-Control-Allow-Methods: "'POST,OPTIONS'" method.response.header.Access-Control-Allow-Origin: "'*'" ResponseTemplates: application/json: '' PassthroughBehavior: "WHEN_NO_MATCH" RequestTemplates: application/json: '{"statusCode": 200}' MethodResponses: - StatusCode: '200' ResponseModels: 'application/json': 'Empty' ResponseParameters: 'method.response.header.Access-Control-Allow-Headers': 'false' 'method.response.header.Access-Control-Allow-Methods': 'false' 'method.response.header.Access-Control-Allow-Origin': 'false' # APIGW to Kinesis Role APIGatewayKinesisRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Principal: Service: - apigateway.amazonaws.com Action: - sts:AssumeRole Path: "/" Policies: - PolicyName: APIGatewayKinesisRolePolicy PolicyDocument: Version: '2012-10-17' Statement: - Effect: Allow Action: - kinesis:PutRecord - kinesis:PutRecords Resource: !GetAtt KinesisStream.Arn # Kinesis Delivery Role KinesisDeliveryRole: Type: AWS::IAM::Role Properties: AssumeRolePolicyDocument: Version: 2012-10-17 Statement: - Sid: '' Effect: Allow Principal: Service: firehose.amazonaws.com Action: 'sts:AssumeRole' Condition: StringEquals: 'sts:ExternalId': !Ref 'AWS::AccountId' KinesisDeliveryPolicy: Type: AWS::IAM::Policy Properties: PolicyName: se-firehose_delivery_policy PolicyDocument: Version: 2012-10-17 Statement: - Effect: Allow Action: - 's3:AbortMultipartUpload' - 's3:GetBucketLocation' - 's3:GetObject' - 's3:ListBucket' - 's3:ListBucketMultipartUploads' - 's3:PutObject' Resource: - !Join - '' - - 'arn:aws:s3:::' - !GetAtt [S3BucketName, Value] - !Join - '' - - 'arn:aws:s3:::' - !GetAtt [S3BucketName, Value] - '/*' - Effect: Allow Action: 'logs:PutLogEvents' Resource: - !Join - '' - - 'arn:aws:logs:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - 'log-group:/aws/kinesisfirehose/*' - ':log-stream:*' - Effect: Allow Action: - "kinesis:SubscribeToShard" - "kinesis:ListShards" - "kinesis:GetShardIterator" - "kinesis:GetRecords" - "kinesis:DescribeStream" - "kinesis:DescribeStreamSummary" Resource: !GetAtt KinesisStream.Arn Roles: - !Ref KinesisDeliveryRole #Lambda Role LambdaRole: Type: "AWS::IAM::Role" Properties: AssumeRolePolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Principal: Service: lambda.amazonaws.com Action: 'sts:AssumeRole' ManagedPolicyArns: - arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess - arn:aws:iam::aws:policy/CloudWatchFullAccess LambdaPolicies: Type: AWS::IAM::Policy Properties: PolicyName: se-lambda-execution-policy PolicyDocument: Version: "2012-10-17" Statement: - Effect: Allow Action: - "personalize:*" Resource: "*" - Effect: Allow Action: - "kinesis:SubscribeToShard" - "kinesis:ListShards" - "kinesis:GetShardIterator" - "kinesis:GetRecords" - "kinesis:DescribeStream" - "kinesis:DescribeStreamSummary" Resource: "*" Roles: - !Ref LambdaRole # Outputs: Outputs: POSTRecommendationsApiGatewayInvokeURL: Value: !Sub "https://${ApiGatewayEndpoint}.execute-api.${AWS::Region}.amazonaws.com/dev2/recommendations" POSTEventsApiGatewayInvokeURL: Value: !Sub "https://${ApiGatewayEndpoint}.execute-api.${AWS::Region}.amazonaws.com/dev2/events" APIKEY: Value: !Sub "https://console.aws.amazon.com/apigateway/home?region=${AWS::Region}#/api-keys/${APIKey}"