# Notebook 4: Realtime Recommendations

Specify "Python 3" Kernel  and "Data Science" Image. Set the instance type as ml.t3.medium (default) for this notebook.

### Background

In this notebook, we will get personalized product recommendations using the Collaborative Filtering model and the Ranking model we deployed in notebook 2.

We'll do this by:

1. Using the deployed Collborative Filtering model which will take the payload that we cached in notebook 2 (to address the cold start problem) as input and generate candidate products based on the user-item interaction.
2. Feeding these candidate products into our Ranking model along with click stream data from our `click stream` Feature Group. You'll see that the real-time click stream data will directly influence the ranking of the candidate products at the time of inference.

<img src="./img/inference-arch.png" alt="Inference arch" style="width: 800px;"/>

### Package updates

In [None]:
!pip install --upgrade pandas

### Imports

In [None]:
import sagemaker
from sagemaker import get_execution_role
import boto3
import json
import numpy as np
import pandas as pd
from utils import *
from sagemaker.serializers import CSVSerializer
from sagemaker.deserializers import JSONDeserializer
import random
from parameter_store import ParameterStore
import time

### Session variables

In [None]:
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
default_bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name
featurestore_runtime = boto3.client(service_name='sagemaker-featurestore-runtime', region_name=region)
ps = ParameterStore(verbose=False)
ps.set_namespace('feature-store-workshop')

In [None]:
query_results= 'sagemaker-recsys-featurestore-workshop'
prefix = 'recsys-feature-store'

Load variables from previous notebooks.

In [None]:
parameters = ps.read()
customers_feature_group_name = parameters['customers_feature_group_name']
products_feature_group_name = parameters['products_feature_group_name']
orders_feature_group_name = parameters['orders_feature_group_name']
click_stream_historical_feature_group_name = parameters['click_stream_historical_feature_group_name']
click_stream_feature_group_name = parameters['click_stream_feature_group_name']

customers_table = parameters['customers_table']
products_table = parameters['products_table']
orders_table = parameters['orders_table']
click_stream_historical_table = parameters['click_stream_historical_table']
click_stream_table = parameters['click_stream_table']

train_data_location = parameters['train_data_location']
test_data_location = parameters['test_data_location']

cf_model_endpoint_name = parameters['cf_model_endpoint_name']
ranking_model_endpoint_name = parameters['ranking_model_endpoint_name']

customer_id = parameters['inference_customer_id']

%store -r

### Get unranked recommended products from the Collaborative Filtering model

Create a `Predictor` object from our collaborative filtering model endpoint (which we deployed in notebook 2) so that we can use it to make predictions.

In [None]:
# Make sure model has finished deploying
existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(NameContains=cf_model_endpoint_name, MaxResults=30)["Endpoints"]
while not existing_endpoints:
    time.sleep(60)
    existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(NameContains=cf_model_endpoint_name, MaxResults=30)["Endpoints"]

cf_model_predictor = sagemaker.predictor.Predictor(endpoint_name=cf_model_endpoint_name, 
                                                   sagemaker_session=sagemaker_session,
                                                   serializer=FMSerializer(),
                                                   deserializer=JSONDeserializer())

In [None]:
# Pass in our cached data as input to the Collaborative Filtering model
predictions = cf_model_predictor.predict(cf_inference_payload)['predictions']

# Add those predictions to the input DataFrame
predictions = [prediction["score"] for prediction in predictions]
cf_inference_df['predictions'] = predictions

# Sort by predictions and take top 10
cf_inference_df = cf_inference_df.sort_values(by='predictions', ascending=False).head(10).reset_index()

Let's take a look at the output from our collaborative filtering model. These unranked products are recommended to the customer based on their purchase history.

In [None]:
cf_inference_df

These products aren't personalized which is what we'll use the ranking model for. The ranking model will take into account the customer's current behavior on the website which will influence the ranking of the recommended products.

### Rank the recommended products using the Ranking model

Create a `Predictor` object from our ranking model endpoint (which we deployed in notebook 2) so that we can use it to make predictions.

In [None]:
# Make sure model has finished deploying
existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(NameContains=ranking_model_endpoint_name, MaxResults=30)["Endpoints"]
while not existing_endpoints:
    time.sleep(60)
    existing_endpoints = sagemaker_session.sagemaker_client.list_endpoints(NameContains=ranking_model_endpoint_name, MaxResults=30)["Endpoints"]

ranking_model_predictor = sagemaker.predictor.Predictor(endpoint_name=ranking_model_endpoint_name, 
                                                        sagemaker_session=sagemaker_session,
                                                        serializer = CSVSerializer())

In order to construct the input for the ranking model, we need to one-hot encode product categories as we did in training. Ideally, these one-hot encoded products would be cached somewhere (perhaps in SageMaker Feature Store or another AWS service) but we're keeping it simple for now.

In [None]:
query = f'''
select product_category
from "{products_table}"
order by product_category
'''
product_categories_df, query = query_offline_store(products_feature_group_name, query,
                                                   sagemaker_session)
one_hot_cat_features = product_categories_df.product_category.unique()

df_one_hot_cat_features = pd.DataFrame(one_hot_cat_features)
df_one_hot_cat_features.columns = ['product_category']

df_one_hot_cat_features = pd.concat([df_one_hot_cat_features, pd.get_dummies(df_one_hot_cat_features['product_category'], prefix='cat')],axis=1)

Now we create a function to take the output from the collaborative filtering model and join it with the one-hot encoded product categories _**AND**_ the real-time click stream data from our `click stream` Feature Group, as this data will influence the ranking of recommended products. You can imagine the ranking inputs like so:

<img src="./img/vector-input-for-ranking.png" alt="Ranking input" style="width: 600px;"/>


We'll use this function in the next section.

In [None]:
def get_ranking_model_input_data(df, df_one_hot_cat_features):
    product_category_list = []
    product_health_index_list = []
    
    customer_id = df.iloc[0]['customer_id']
    # Get customer features from customers_feature_group_name
    customer_record = featurestore_runtime.get_record(FeatureGroupName=customers_feature_group_name,
                                                      RecordIdentifierValueAsString=customer_id,
                                                      FeatureNames=['customer_health_index'])
    
    customer_health_index = customer_record['Record'][0]['ValueAsString']
    
    # Get product features (instead of looping, you can optionally use
    # the `batch_get_record` Feature Store API)
    for index, row_tuple in df.iterrows():
        
        product_id = row_tuple['product_id']
        
        # Get product features from products_feature_group_name
        product_record = featurestore_runtime.get_record(FeatureGroupName=products_feature_group_name,
                                                         RecordIdentifierValueAsString=product_id,
                                                         FeatureNames=['product_category',
                                                                       'product_health_index'])
        
        product_category = product_record['Record'][0]['ValueAsString']
        product_health_index = product_record['Record'][1]['ValueAsString']
        
        product_category_list.append(product_category)
        product_health_index_list.append(product_health_index)

        

    # Get click stream features from customers_click_stream_feature_group_name
    click_stream_record = featurestore_runtime.get_record(FeatureGroupName=click_stream_feature_group_name,
                                                          RecordIdentifierValueAsString=customer_id,
                                                          FeatureNames=['sum_activity_weight_last_2m',
                                                                  'avg_product_health_index_last_2m'])
    
    # Calculate healthy_activity_last_2m as this will influence ranking as well
    sum_activity_weight_last_2m = click_stream_record['Record'][0]['ValueAsString']
    avg_product_health_index_last_2m = click_stream_record['Record'][1]['ValueAsString']
    healthy_activity_last_2m = int(sum_activity_weight_last_2m) * float(avg_product_health_index_last_2m)

    data = {'healthy_activity_last_2m': healthy_activity_last_2m,
            'product_health_index': product_health_index_list,
            'customer_health_index': customer_health_index,
            'product_category': product_category_list}
    
    ranking_inference_df = pd.DataFrame(data)
    ranking_inference_df = ranking_inference_df.merge(df_one_hot_cat_features, on='product_category',
                                                      how='left')
    del ranking_inference_df['product_category']

    return ranking_inference_df

### Real-time personalized product recommendations

Let's finally put everything together by calling the function we created above to get real-time personalized product recommendations using data that's being streamed to SageMaker Feature Store to influence ranking.

Remember that we already have our unranked recommended products for our collaborative filtering model.

In [None]:
cf_inference_df

Now, we need to rank those products recommendations.

In [None]:
# Construct input data for the ranking model
ranking_inference_df = get_ranking_model_input_data(cf_inference_df, df_one_hot_cat_features)

# Get our ranked product recommendations and attach the predictions to the model input
ranking_inference_df['propensity_to_buy'] = ranking_model_predictor.predict(ranking_inference_df.to_numpy()).decode('utf-8').split(',')

Now that we have our personalized ranked recommendations, let's see what the top 5 recommended products are.

In [None]:
# Join all the data back together for inspection
personalized_recommendations = pd.concat([cf_inference_df[['customer_id', 'product_id', 'product_name']],
                                          ranking_inference_df[['propensity_to_buy']]], axis=1)

# And sort by propensity to buy
personalized_recommendations.sort_values(by='propensity_to_buy', ascending=False)[['product_id','product_name']].reset_index(drop=True).head(5)

<div class="alert alert-info"> ðŸ’¡ <strong> Personalized recommendations </strong>
Note how the ranking changed based on the customer's last 2 minutes of activity on the website.
</div>

### Congratulations! ðŸŽ‰

You've officially built a recommendation engine system leveraging SageMaker Feature Store as a way to both train the recommendation engine models and influence recommendations in real-time! ðŸŽ‰ ðŸ’ª

### (Optional)

Now you've seen real-time recommendations for a customer interacting with unhealthy products, you can repeat this experiment to simulate healthy product interactions by ingesting new data to the stream. If you'd like to do this, follow the below steps:

1. Go to notebook 3, navigate to the last section.
2. Replace `put_records_in_kinesis_stream(inference_customer_id, 0.1, 0.3)` with `put_records_in_kinesis_stream(inference_customer_id, 0.7, 0.9)`.
3. Wait for the data to be ingested to the Kinesis Data Stream.
4. Re-run the "Real-time personalized recommendation" section in this notebook.

Go back to Workshop Studio and click on "Next".