AWSTemplateFormatVersion: 2010-09-09 Description: Creates the websocket api for real time personalization of live events # Parameters to be used through out the template Parameters: Stage: Type: String Default: production AWSRegion: Type: String Default: NA AWSPersonalizeCampaignARN: Type: String Default: NA AWSPersonalizeCampaignAWSRegion: Type: String Default: NA AWSPersonalizeCampaignNumberOfResults: Type: String Default: 10 Resources: # parameter for the Amazon Personalize campaign ARN AmazonPersonalizeCampaignARN: Type: AWS::SSM::Parameter Properties: Description: parameter for the Amazon Personalize campaign ARN Name: /rt_personalize/aws_personalize/campaign_arn Type: String Value: !Ref AWSPersonalizeCampaignARN # parameter for the Amazon Personalize number of results AmazonPersonalizeNumberOfResults: Type: AWS::SSM::Parameter Properties: Description: parameter for the Amazon Personalize number of results Name: /rt_personalize/aws_personalize/num_results Type: String Value: !Ref AWSPersonalizeCampaignNumberOfResults # parameter for the Amazon Personalize hosted regions AmazonPersonalizeHostedRegion: Type: AWS::SSM::Parameter Properties: Description: parameter for the Amazon Personalize hosted region Name: /rt_personalize/aws_personalize/region Type: String Value: !Ref AWSPersonalizeCampaignAWSRegion # table for live event manager, this table has ddb streams enabled driving lambda RTPersonalizeEventManager: Type: 'AWS::DynamoDB::Table' Properties: TableName: 'rt_personalize_event_manager' StreamSpecification: StreamViewType: NEW_AND_OLD_IMAGES KeySchema: - AttributeName: event_id KeyType: HASH AttributeDefinitions: - AttributeName: event_id AttributeType: S ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 UpdateReplacePolicy: Delete DeletionPolicy: Delete # table for managing user session stores, as they connect to web socket RTPersonalizeUserSessionStore: Type: 'AWS::DynamoDB::Table' Properties: TableName: 'rt_personalize_user_session_store' KeySchema: - AttributeName: session_id KeyType: HASH AttributeDefinitions: - AttributeName: session_id AttributeType: S ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 UpdateReplacePolicy: Delete DeletionPolicy: Delete # table to store the item (ux components to be used for the live event) RTPersonalizeSoccerPersonalizeItem: Type: 'AWS::DynamoDB::Table' Properties: TableName: 'rt_personalize_soccer_ux_item' KeySchema: - AttributeName: item_id KeyType: HASH AttributeDefinitions: - AttributeName: item_id AttributeType: S ProvisionedThroughput: ReadCapacityUnits: 5 WriteCapacityUnits: 5 UpdateReplacePolicy: Delete DeletionPolicy: Delete # service handler role rtPersonalizeEventManagerServiceHandlerRole: Type: 'AWS::IAM::Role' Properties: AssumeRolePolicyDocument: Statement: - Action: 'sts:AssumeRole' Effect: Allow Principal: Service: Version: 2012-10-17 ManagedPolicyArns: - !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':iam::aws:policy/service-role/AWSLambdaBasicExecutionRole' # role policy # note: fine tune the policy as needed per least-privilege-principal rtPersonalizeEventManagerServiceHandlerRoleDefaultPolicy: Type: 'AWS::IAM::Policy' Properties: PolicyDocument: Statement: - Action: - 'dynamodb:BatchWriteItem' - 'dynamodb:PutItem' - 'dynamodb:UpdateItem' - 'dynamodb:DeleteItem' - 'dynamodb:DescribeStream' - 'dynamodb:GetRecords' - 'dynamodb:GetShardIterator' - 'dynamodb:Query' - 'dynamodb:GetItem' - 'dynamodb:Scan' - 'dynamodb:ListStreams' - 'ssm:PutParameter' - 'ssm:DeleteParameter' - 'ssm:GetParametersByPath' - 'ssm:GetParameters' - 'ssm:GetParameter' - 'ssm:DeleteParameters' - 'personalize:DescribeSolution' - 'personalize:DescribeCampaign' - 'personalize:GetRecommendations' Effect: Allow Resource: - !GetAtt RTPersonalizeEventManager.Arn - !GetAtt RTPersonalizeEventManager.StreamArn - !GetAtt RTPersonalizeUserSessionStore.Arn - !GetAtt RTPersonalizeSoccerPersonalizeItem.Arn Version: 2012-10-17 PolicyName: rtPersonalizeEventManagerServiceHandlerRoleDefaultPolicy Roles: - !Ref rtPersonalizeEventManagerServiceHandlerRole # lambda function to handle ddb streams rtPersonalizeDdbEventStreamHandler: Type: 'AWS::Lambda::Function' Properties: FunctionName: 'rt_personalize_ddb_event_stream_handler' Timeout: 30 Code: ZipFile: |- import json import boto3 def get_personalized_ux_components_by_criteria(user_id, context, num_results, campaign_arn=None, personalize_region='us-east-1'): try: personalize_runtime = boto3.client('personalize-runtime', personalize_region) response = personalize_runtime.get_recommendations( campaignArn=campaign_arn, userId=user_id, numResults=num_results, context={ 'MATCH_EVENT': context['MATCH_EVENT'], 'MATCH_TIMING': context['MATCH_TIMING'], 'SCORE_DIFFERENCE': context['SCORE_DIFFERENCE'], 'MATCH_TYPE': context['MATCH_TYPE'], 'MATCH_VENUE': context['MATCH_VENUE'] } ) item_list_collection = response['itemList'] except Exception as e: print('an exception has occurred...', str(e)) raise e return item_list_collection # improve this function to avoid full table scan def get_item_component_collection(): try: dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('rt_personalize_soccer_ux_item') response = table.scan() except Exception as e: print('an exception has occurred...', str(e)) raise e return response['Items'] def get_item_id_details(item_id, item_collection_master_set): for item_record in item_collection_master_set: if item_record['item_id'] == item_id: return item_record def moderate_component_recommendation(dispatch_response, item_collection_master_set): filtered_component_list = [] component = dict() try: for record in dispatch_response: item_id = record['itemId'] item = get_item_id_details(item_id, item_collection_master_set) component_type = item['item_type'] # before insert, check if this component type is already used. skip duplicate. if component_type not in component.keys(): component[component_type] = item_id except Exception as e: print('an exception has occurred...', str(e)) raise e return component def lambda_handler(event, context): status_code = 200 response_body = 'successfully handled event state change' num_results = 10 campaign_arn = None domain = None stage = None personalize_solution_region = None # load parameters from ssm ssm = boto3.client('ssm') personalize_num_result_param = ssm.get_parameter(Name='/rt_personalize/aws_personalize/num_results', WithDecryption=True) num_results = int(personalize_num_result_param['Parameter']['Value']) personalize_solution_region_param = ssm.get_parameter(Name='/rt_personalize/aws_personalize/region', WithDecryption=True) personalize_region = personalize_solution_region_param['Parameter']['Value'] personalize_cmpgn_arn_param = ssm.get_parameter(Name='/rt_personalize/aws_personalize/campaign_arn', WithDecryption=True) campaign_arn = personalize_cmpgn_arn_param['Parameter']['Value'] personalize_domain_param = ssm.get_parameter(Name='/rt_personalize/api/api_gateway/domain', WithDecryption=True) domain = personalize_domain_param['Parameter']['Value'] personalize_stage_param = ssm.get_parameter(Name='/rt_personalize/api/api_gateway/stage', WithDecryption=True) stage = personalize_stage_param['Parameter']['Value'] try: # get item master collection item_collection_master_set = get_item_component_collection() # get the connection id dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('rt_personalize_user_session_store') # doing table scan for this prototype. re-visit this for large datasets user_session_store_collection = table.scan() # send a response back to client with a success message endpoint = 'https://' + domain + '/' + stage api_client = boto3.client('apigatewaymanagementapi', endpoint_url=endpoint) # the context variables to drive personalization for a live event(e.g. - soccer) event_id = None match_timing = None match_event = None score_difference = None match_venue = None match_type = None # get what changed in the context for record in event['Records']: if record['eventName'] == 'MODIFY': event_id = next(iter((record['dynamodb']['NewImage']['event_id']).values())) match_timing = next(iter((record['dynamodb']['NewImage']['match_timing']).values())) match_event = next(iter((record['dynamodb']['NewImage']['match_event']).values())) score_difference = next(iter((record['dynamodb']['NewImage']['score_difference']).values())) match_venue = next(iter((record['dynamodb']['NewImage']['match_venue']).values())) match_type = next(iter((record['dynamodb']['NewImage']['match_type']).values())) # pack data for function call context = {} context['MATCH_EVENT'] = match_event context['MATCH_TIMING'] = match_timing context['SCORE_DIFFERENCE'] = score_difference context['MATCH_TYPE'] = match_type context['MATCH_VENUE'] = match_venue # get the set of active sessions for item in user_session_store_collection['Items']: connection_id = item['session_id'] consumer_id = item['consumer_id'] device_id = item['device_id'] content_id = item['content_id'] # call amazon personalize with the input data and context dispatch_response = get_personalized_ux_components_by_criteria(consumer_id, context, num_results, campaign_arn, personalize_region) # moderate component recommendation. we should not have multiple components of the same type moderated_response = moderate_component_recommendation(dispatch_response, item_collection_master_set) # dispatch to consumer over webso api_client.post_to_connection(ConnectionId=connection_id, Data=json.dumps(moderated_response)) except Exception as e: print('an exception has occurred...', str(e)) status_code = 500 response_body = 'an exception occurred while handling personalization' raise e return { 'statusCode': status_code, 'body': json.dumps(response_body) } Role: !GetAtt - rtPersonalizeEventManagerServiceHandlerRole - Arn Handler: index.lambda_handler Runtime: python3.7 DependsOn: - rtPersonalizeEventManagerServiceHandlerRoleDefaultPolicy - rtPersonalizeEventManagerServiceHandlerRole # lambda function for connect to web socket rtPersonalizeWebSocketConectHandler: Type: 'AWS::Lambda::Function' Properties: FunctionName: 'rt_personalize_websocket_connection_handler' Timeout: 30 Code: ZipFile: |- import json import boto3 def lambda_handler(event, context): status_code = 200 status_message = 'successfully processed client message...' connectionID = event['requestContext']['connectionId'] try: dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('rt_personalize_user_session_store') response = table.put_item( Item={'session_id': str(connectionID) }) # update param store for subsequent calls api_endpoint_domain = event['requestContext']['domainName'] api_endpoint_stage = event['requestContext']['stage'] ssm = boto3.client('ssm') response = ssm.put_parameter(Name='/rt_personalize/api/api_gateway/domain', Value=api_endpoint_domain, Type='String', Overwrite=True) response = ssm.put_parameter(Name='/rt_personalize/api/api_gateway/stage', Value=api_endpoint_stage, Type='String', Overwrite=True) except Exception as e: print('exception occurred while db insert ' + str(e)) status_code = 500 status_message = 'failed to process client message' raise e return { 'statusCode': status_code, 'body': json.dumps(status_message) } Role: !GetAtt - rtPersonalizeEventManagerServiceHandlerRole - Arn Handler: index.lambda_handler Runtime: python3.7 DependsOn: - rtPersonalizeEventManagerServiceHandlerRoleDefaultPolicy - rtPersonalizeEventManagerServiceHandlerRole # lambda function to send message over a websocket rtPersonalizeWebSocketSendMessageHandler: Type: 'AWS::Lambda::Function' Properties: FunctionName: 'rt_personalize_websocket_sendmessage_handler' Timeout: 30 Code: ZipFile: |- import json import boto3 def lambda_handler(event, context): status_code = 200 status_message = 'successfully processed client message...' try: connection_id = event['requestContext']['connectionId'] # get customer id from input message message = json.loads(event['body']) consumer_id = message['consumer_id'] device_id = message['device_id'] content_id = message['content_id'] # update customer id based on the partition key of session_id dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('rt_personalize_user_session_store') resp = table.update_item(Key={'session_id': connection_id}, UpdateExpression="set consumer_id = :cns, device_id = :dvc, content_id = :cnt ", ExpressionAttributeValues={':cns': consumer_id, ':dvc': device_id, ':cnt': content_id }, ReturnValues="UPDATED_NEW") # send a response back to client with a success message endpoint = 'https://' + event['requestContext']['domainName'] + '/' + event['requestContext']['stage'] api_client = boto3.client('apigatewaymanagementapi', endpoint_url=endpoint) status = 'success' api_client.post_to_connection(ConnectionId=connection_id, Data=json.dumps({'status': status})) except Exception as e: print('exception occurred while db insert ' + str(e)) status_code = 500 status_message = 'failed to process client message' raise e return { 'statusCode': status_code, 'body': json.dumps(status_message) } Role: !GetAtt - rtPersonalizeEventManagerServiceHandlerRole - Arn Handler: index.lambda_handler Runtime: python3.7 DependsOn: - rtPersonalizeEventManagerServiceHandlerRoleDefaultPolicy - rtPersonalizeEventManagerServiceHandlerRole # lambda function for disconnect rtPersonalizeWebSocketDisconnectHandler: Type: 'AWS::Lambda::Function' Properties: FunctionName: 'rt_personalize_websocket_disconnect_handler' Timeout: 30 Code: ZipFile: |- import json import boto3 def lambda_handler(event, context): status_code = 200 body = 'successfully deleted session on disconnect' try: connection_id = event['requestContext']['connectionId'] # delete the session id dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('rt_personalize_user_session_store') response = table.delete_item( Key={ 'session_id': connection_id } ) except Exception as e: print('an exception has occurred ' + str(e)) body = 'failed to delete session id on delete' status_code = 500 raise e return { 'statusCode': status_code, 'body': json.dumps(body) } Role: !GetAtt - rtPersonalizeEventManagerServiceHandlerRole - Arn Handler: index.lambda_handler Runtime: python3.7 DependsOn: - rtPersonalizeEventManagerServiceHandlerRoleDefaultPolicy - rtPersonalizeEventManagerServiceHandlerRole # lambda function for default rtPersonalizeWebSocketDefaultHandler: Type: 'AWS::Lambda::Function' Properties: FunctionName: 'rt_personalize_websocket_default_handler' Code: ZipFile: |- import json import boto3 def lambda_handler(event, context): status_code = 200 response_body = 'successfully handled default route' try: connection_id = event['requestContext']['connectionId'] # send a response back to client with a success message endpoint = 'https://' + event['requestContext']['domainName'] + '/' + event['requestContext']['stage'] api_client = boto3.client('apigatewaymanagementapi', endpoint_url=endpoint) message_body = 'this is default route. please use right action key' # dispatch to consumer over websocket api_client.post_to_connection(ConnectionId=connection_id, Data=json.dumps(message_body)) except Exception as e: print('an exception has occurred ' + str(e)) response_body = 'an exception ocurred while handling default route' status_code = 500 raise e return { 'statusCode': status_code, 'body': json.dumps(response_body) } Role: !GetAtt - rtPersonalizeEventManagerServiceHandlerRole - Arn Handler: index.lambda_handler Runtime: python3.7 DependsOn: - rtPersonalizeEventManagerServiceHandlerRoleDefaultPolicy - rtPersonalizeEventManagerServiceHandlerRole # lambda function for sendmessage(custom route) rtPersonalizeWebSocketSendMessageHandler: Type: 'AWS::Lambda::Function' Properties: FunctionName: 'rt_personalize_websocket_sendmessage_handler' Code: ZipFile: |- import json import boto3 def lambda_handler(event, context): status_code = 200 status_message = 'successfully processed client message...' try: connection_id = event['requestContext']['connectionId'] # get customer id from input message message = json.loads(event['body']) consumer_id = message['consumer_id'] device_id = message['device_id'] content_id = message['content_id'] # update customer id based on the partition key of session_id dynamodb = boto3.resource('dynamodb') table = dynamodb.Table('rt_personalize_user_session_store') resp = table.update_item(Key={'session_id': connection_id}, UpdateExpression="set consumer_id = :cns, device_id = :dvc, content_id = :cnt ", ExpressionAttributeValues={':cns': consumer_id, ':dvc': device_id, ':cnt': content_id }, ReturnValues="UPDATED_NEW") # send a response back to client with a success message endpoint = 'https://' + event['requestContext']['domainName'] + '/' + event['requestContext']['stage'] api_client = boto3.client('apigatewaymanagementapi', endpoint_url=endpoint) status = 'success' api_client.post_to_connection(ConnectionId=connection_id, Data=json.dumps({'status': status})) except Exception as e: print('exception occurred while db insert ' + str(e)) status_code = 500 status_message = 'failed to process client message' raise e return { 'statusCode': status_code, 'body': json.dumps(status_message) } Role: !GetAtt - rtPersonalizeEventManagerServiceHandlerRole - Arn Handler: index.lambda_handler Runtime: python3.7 DependsOn: - rtPersonalizeEventManagerServiceHandlerRoleDefaultPolicy - rtPersonalizeEventManagerServiceHandlerRole # event mapping between the ddb stream and lambda function EventMapping: Type: AWS::Lambda::EventSourceMapping Properties: EventSourceArn: !GetAtt RTPersonalizeEventManager.StreamArn FunctionName: !GetAtt rtPersonalizeDdbEventStreamHandler.Arn StartingPosition: 'LATEST' # api gateway rtPersonalizationWebSocketApiGateway: Type: 'AWS::ApiGatewayV2::Api' Properties: Name: rt_personalization_websocket_api_gateway ProtocolType: WEBSOCKET RouteSelectionExpression: $request.body.action # rt personalization api gateway integration persmission rtPersonalizeWebSocketConnectIntegrationPermission: Type: 'AWS::Lambda::Permission' Properties: Action: 'lambda:InvokeFunction' FunctionName: !GetAtt - rtPersonalizeWebSocketConectHandler - Arn Principal: SourceArn: !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':execute-api:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':' - !Ref rtPersonalizationWebSocketApiGateway - /*/*$connect # rt personalize connect route integration rtPersonalizeConnectRouteIntegration: Type: 'AWS::ApiGatewayV2::Integration' Properties: ApiId: !Ref rtPersonalizationWebSocketApiGateway IntegrationType: AWS_PROXY IntegrationUri: !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':apigateway:' - !Ref 'AWS::Region' - ':lambda:path/2015-03-31/functions/' - !GetAtt - rtPersonalizeWebSocketConectHandler - Arn - /invocations # rt personalize connect route rtPersonalizeConnectRoute: Type: 'AWS::ApiGatewayV2::Route' Properties: ApiId: !Ref rtPersonalizationWebSocketApiGateway RouteKey: $connect AuthorizationType: NONE Target: !Join - '' - - integrations/ - !Ref rtPersonalizeConnectRouteIntegration # rt personalization api gateway integration persmission disconnect rtPersonalizeWebSocketDisconnectIntegrationPermission: Type: 'AWS::Lambda::Permission' Properties: Action: 'lambda:InvokeFunction' FunctionName: !GetAtt - rtPersonalizeWebSocketDisconnectHandler - Arn Principal: SourceArn: !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':execute-api:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':' - !Ref rtPersonalizationWebSocketApiGateway - /*/*$disconnect # rt personalize disconnect route integration rtPersonalizeDisconnectRouteIntegration: Type: 'AWS::ApiGatewayV2::Integration' Properties: ApiId: !Ref rtPersonalizationWebSocketApiGateway IntegrationType: AWS_PROXY IntegrationUri: !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':apigateway:' - !Ref 'AWS::Region' - ':lambda:path/2015-03-31/functions/' - !GetAtt - rtPersonalizeWebSocketDisconnectHandler - Arn - /invocations # rt personalize disconnect route rtPersonalizeDisconnectRoute: Type: 'AWS::ApiGatewayV2::Route' Properties: ApiId: !Ref rtPersonalizationWebSocketApiGateway RouteKey: $disconnect AuthorizationType: NONE Target: !Join - '' - - integrations/ - !Ref rtPersonalizeDisconnectRouteIntegration # rt personalization api gateway integration persmission default rtPersonalizeWebSocketDefaultIntegrationPermission: Type: 'AWS::Lambda::Permission' Properties: Action: 'lambda:InvokeFunction' FunctionName: !GetAtt - rtPersonalizeWebSocketDefaultHandler - Arn Principal: SourceArn: !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':execute-api:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':' - !Ref rtPersonalizationWebSocketApiGateway - /*/*$default # rt personalize deault route integration rtPersonalizeDefaultRouteIntegration: Type: 'AWS::ApiGatewayV2::Integration' Properties: ApiId: !Ref rtPersonalizationWebSocketApiGateway IntegrationType: AWS_PROXY IntegrationUri: !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':apigateway:' - !Ref 'AWS::Region' - ':lambda:path/2015-03-31/functions/' - !GetAtt - rtPersonalizeWebSocketDefaultHandler - Arn - /invocations # rt personalize default route rtPersonalizeDefaultRoute: Type: 'AWS::ApiGatewayV2::Route' Properties: ApiId: !Ref rtPersonalizationWebSocketApiGateway RouteKey: $default AuthorizationType: NONE Target: !Join - '' - - integrations/ - !Ref rtPersonalizeDefaultRouteIntegration # rt personalization api gateway integration persmission sendmessage rtPersonalizeWebSocketSendMessasgeIntegrationPermission: Type: 'AWS::Lambda::Permission' Properties: Action: 'lambda:InvokeFunction' FunctionName: !GetAtt - rtPersonalizeWebSocketSendMessageHandler - Arn Principal: SourceArn: !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':execute-api:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':' - !Ref rtPersonalizationWebSocketApiGateway - /*/*sendmessage # rt personalize sendmessage route integration rtPersonalizeSendMessageIntegration: Type: 'AWS::ApiGatewayV2::Integration' Properties: ApiId: !Ref rtPersonalizationWebSocketApiGateway IntegrationType: AWS_PROXY IntegrationUri: !Join - '' - - 'arn:' - !Ref 'AWS::Partition' - ':apigateway:' - !Ref 'AWS::Region' - ':lambda:path/2015-03-31/functions/' - !GetAtt - rtPersonalizeWebSocketSendMessageHandler - Arn - /invocations # rt personalize sendmessage route rtPersonalizeSendMessageRoute: Type: 'AWS::ApiGatewayV2::Route' Properties: ApiId: !Ref rtPersonalizationWebSocketApiGateway RouteKey: sendmessage AuthorizationType: NONE Target: !Join - '' - - integrations/ - !Ref rtPersonalizeSendMessageIntegration # rt personalize api gateway stage rtPersonalizeApiGatewayStage: Type: 'AWS::ApiGatewayV2::Stage' Properties: ApiId: !Ref rtPersonalizationWebSocketApiGateway StageName: dev AutoDeploy: true # manage connections policy rtPersonalizeManageConnectionPolicy: Type: 'AWS::IAM::Policy' Properties: PolicyDocument: Statement: - Action: 'execute-api:ManageConnections' Effect: Allow Resource: !Join - '' - - 'arn:aws:execute-api:' - !Ref 'AWS::Region' - ':' - !Ref 'AWS::AccountId' - ':' - !Ref rtPersonalizationWebSocketApiGateway - /dev/POST/@connections/* Version: 2012-10-17 PolicyName: rtPersonalizeManageConnectionPolicy Roles: - !Ref rtPersonalizeEventManagerServiceHandlerRole # output the api gateway url Outputs: apiGatewayInvokeURL: Value: !Sub wss://${rtPersonalizationWebSocketApiGateway}.execute-api.${AWS::Region}${rtPersonalizeApiGatewayStage}