# Notebook 2: Recommendation Engine Models

Specify "Python 3" Kernel  and "Data Science" Image. Set the instance type as ml.t3.medium (default) for this notebook.

### Background

In this notebook, we'll be building two models: a collaborative filtering model using SageMaker's built-in Factorization Machines and a ranking model leveraging SageMaker's built-in XGBoost.

The collaborative filtering model will recommend products based on historical user-product interaction.

The ranking model will rerank the recommended products from the collaborative filtering model by taking the user's click-stream activity and using that to make personalized recommendations.

We'll put these two models together in order to built a recommendation engine.

For example, imagine a user is shopping around on a website and visits a "hot fudge" product. We'll want to fetch related items and sort them by the user's recent activity.

This notebook should take ~20 minutes to run.

### Imports

In [None]:
import sagemaker
import sagemaker.amazon.common as smac
from sagemaker import get_execution_role
from sagemaker.feature_store.feature_group import FeatureGroup
from sagemaker.serializers import CSVSerializer
from sagemaker.inputs import TrainingInput
import boto3
import io
import json
import numpy as np
import pandas as pd
from sagemaker.deserializers import JSONDeserializer

from utils import *
from scipy.sparse import hstack
from sklearn.preprocessing import OneHotEncoder
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.model_selection import train_test_split
from time import gmtime, strftime, sleep, time
from parameter_store import ParameterStore

### Session variables

In [None]:
role = sagemaker.get_execution_role()
sagemaker_session = sagemaker.Session()
default_bucket = sagemaker_session.default_bucket()
region = sagemaker_session.boto_region_name
s3_client = boto3.client('s3', region_name=region)
featurestore_runtime = boto3.client(service_name='sagemaker-featurestore-runtime',
                                    region_name=region)
ps = ParameterStore(verbose=False)
ps.set_namespace('feature-store-workshop')

In [None]:
# CF model variables
prefix = 'recsys'
train_key = 'train.protobuf'
train_prefix = f'{prefix}/train'
test_key = 'test.protobuf'
test_prefix = f'{prefix}/test'
output_prefix = f's3://{default_bucket}/{prefix}/output'

# Other variables used in notebook
current_timestamp = strftime('%m-%d-%H-%M', gmtime())
query_results= 'sagemaker-recsys-featurestore-workshop'
prefix = 'recsys-feature-store'
cf_model_endpoint_name = f'recsys-cf-model-{current_timestamp}'
ranking_model_endpoint_name = f'recsys-rerank-model-{current_timestamp}'

# Add variables to be saved for later notebooks
ps.add({'cf_model_endpoint_name': cf_model_endpoint_name,
        'ranking_model_endpoint_name': ranking_model_endpoint_name})

Load variables from the previous notebook.

In [None]:
parameters = ps.read()

customers_feature_group_name = parameters['customers_feature_group_name']
products_feature_group_name = parameters['products_feature_group_name']
orders_feature_group_name = parameters['orders_feature_group_name']
click_stream_historical_feature_group_name = parameters['click_stream_historical_feature_group_name']
click_stream_feature_group_name = parameters['click_stream_feature_group_name']

customers_table = parameters['customers_table']
products_table = parameters['products_table']
orders_table = parameters['orders_table']
click_stream_historical_table = parameters['click_stream_historical_table']
click_stream_table = parameters['click_stream_table']

### Query Feature Store for Collaborative Filtering model training data

Before we train our collaborative filtering model, we need data.

Now that we have our data in the Feature Store, let's query the offline store (across multiple `FeatureGroups` that we created in the previous notebook) to get the data we'll need to train our collaborative filtering model.

In [None]:
query = f'''
select click_stream_customers.customer_id,
       products.product_id,
       rating,
       state,
       age,
       is_married,
       product_name
from (
    select c.customer_id,
           cs.product_id,
           cs.bought,
           cs.rating,
           c.state,
           c.age,
           c.is_married
    from "{click_stream_historical_table}" as cs
    left join "{customers_table}" as c
    on cs.customer_id = c.customer_id
) click_stream_customers
left join
(select * from "{products_table}") products
on click_stream_customers.product_id = products.product_id
where click_stream_customers.bought = 1
'''

df_cf_features, query = query_offline_store(click_stream_feature_group_name, query,
                                            sagemaker_session)
df_cf_features.head()

The feature store has some metadata columns that can be used to filter out any duplicate (since the offline feature store is versioned) and deleted records (deleted records don't really get deleted. Instead, an `is_deleted` metadata column is turned to `True`).

We don't filter for those things here to keep the query a little more readable, but feel free to see examples of this in our [docs](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-athena-glue-integration.html).

### Prepare training data for Collaborative Filtering model

Now that we've got our training data, we need to transform a few variables so that we have a proper input for our model. We'll be using just two types of transformations: one-hot encoding and tf-idf.

We have below a couple helper functions to help us with this.

In [None]:
def transform_cf_data(training_df, inference_df=None):
    """
    Transform a pandas DataFrame to prepare for
    collabative filtering model input.
    
    :training_df: pandas.DataFrame
    :inference_df: pandas.DataFrame
    :return: numpy.ndarray
    """
    enc = OneHotEncoder(handle_unknown='ignore')
    vectorizer = TfidfVectorizer(min_df=2)
    
    onehot_cols = ['product_id', 'customer_id', 'is_married',
                   'state']
    
    if inference_df is not None:
        enc.fit(training_df[onehot_cols])
        onehot_output = enc.transform(inference_df[onehot_cols])
        unique_descriptions = training_df['product_name'].unique()
        vectorizer.fit(unique_descriptions)
        tfidf_output = vectorizer.transform(inference_df['product_name'])
    else:
        onehot_output = enc.fit_transform(training_df[onehot_cols])
        unique_descriptions = training_df['product_name'].unique()
        vectorizer.fit(unique_descriptions)
        tfidf_output = vectorizer.transform(training_df['product_name'])
    
    X = hstack([onehot_output, tfidf_output], format='csr', dtype='float32')
    return X
    
def load_dataset(df):
    """
    Transform dataframe and split into features
    and target variable
    
    :param df: pandas.DataFrame
    :return: tuple(numpy.ndarray, numpy.ndarray)
    """
    X = transform_cf_data(df)
    y = df['rating'].values.astype('float32')
    return X, y

We load and transform the dataset.

In [None]:
X, y = load_dataset(df_cf_features)

Then split our data into train and test sets.

In [None]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

print(X_train.shape, X_test.shape, y_train.shape, y_test.shape)

Lastly, the Factorization Machines model expects our input data to be in RecordIO Format.

In the protobuf RecordIO format, SageMaker converts each observation in the dataset into a binary representation as a set of 4-byte floats, then loads it in the protobuf values field.

So let's convert our training data to this RecordIO format and upload it to S3.

In [None]:
def write_dataset_to_protobuf(X, y, bucket, prefix, key):
    """
    Save numpy data as RecordIO format and upload
    to S3
    
    :param X: numpy.ndarray
    :param y: numpy.ndarray
    :param bucket: str
    :param prefix: str
    :param key: str
    """
    buf = io.BytesIO()
    smac.write_spmatrix_to_sparse_tensor(buf, X, y)
    buf.seek(0)
    obj = "{}/{}".format(prefix, key)
    boto3.resource("s3").Bucket(bucket).Object(obj).upload_fileobj(buf)
    return "s3://{}/{}".format(bucket, obj)

train_data_location = write_dataset_to_protobuf(X_train, y_train, default_bucket, train_prefix, train_key)
test_data_location = write_dataset_to_protobuf(X_test, y_test, default_bucket, test_prefix, test_key)

print(train_data_location)
print(test_data_location)
print("Output: {}".format(output_prefix))

In [None]:
# Add variables to be saved for later notebooks
ps.add({'train_data_location': train_data_location,
        'test_data_location': test_data_location})

### Train Collaborative Filtering model using SageMaker

Let's create a collaborative filtering model. A collaborative filering model predicts the interests of a user by looking at the interests of many more users. For example, if you want to recommend an item to user A, you might base it off the interest of a similar user B.

For our purposes, we'll be using [Factorization Machines](https://docs.aws.amazon.com/sagemaker/latest/dg/fact-machines.html) as our collaborive filtering model which is a general-purpose supervised learning algorithm that you can use for both classification and regression tasks. It's an extension of a linear model that is designed to capture interactions between features within high dimensional sparse datasets economically.

Essentially, our collaborative filtering model will recommend products based on historical user-product interaction.

<img src="./img/collab-inputs.png" alt="collab filtering model inputs" style="width: 500px;"/>

Define an Estimator and use Factorization Machines container image.

In [None]:
container = sagemaker.image_uris.retrieve("factorization-machines", region=region)

fm = sagemaker.estimator.Estimator(
    container,
    role,
    instance_count=1,
    instance_type="ml.c5.xlarge",
    output_path=output_prefix,
    sagemaker_session=sagemaker_session,
)

# Set our hyperparameters
input_dims = X_train.shape[1]
fm.set_hyperparameters(
    feature_dim=input_dims,
    predictor_type="regressor",
    mini_batch_size=1000,
    num_factors=64,
    epochs=20,
)

Train the model.

In [None]:
fm.fit({'train': train_data_location, 'test': test_data_location})

In [None]:
training_job_name = fm.latest_training_job.job_name

### Deploy Collaborative Filtering model

Now that we've trained our model, let's deploy it as a real-time endpoint.

In [None]:
cf_model_predictor = fm.deploy(
    endpoint_name = cf_model_endpoint_name,
    initial_instance_count=1,
    instance_type="ml.m4.xlarge",
    serializer=FMSerializer(),
    deserializer=JSONDeserializer(),
    wait=False
)

In [None]:
cf_model_predictor.endpoint_name

### Query Feature Store for Ranking model training data

Now that we've trained our collaborative filtering model, let's now move on to training our ranking model.

First, let's query the offline feature store (across multiple `FeatureGroups`) to get the data we'll need to train our ranking model.

In [None]:
query = f'''
select bought,
       healthy_activity_last_2m,
       product_health_index,
       customer_health_index,
       product_category
from (
    select c.customer_health_index,
           cs.product_id,
           cs.healthy_activity_last_2m,
           cs.bought
    from "{click_stream_historical_table}" as cs
    left join "{customers_table}" as c
    on cs.customer_id = c.customer_id
) click_stream_customers
left join
(select * from "{products_table}") products
on click_stream_customers.product_id = products.product_id
'''

df_rank_features, query = query_offline_store(click_stream_feature_group_name, query,
                                              sagemaker_session)
df_rank_features.head()

The feature store has some metadata columns that can be used to filter out any duplicates (since the offline feature store is versioned) and deleted records (deleted records don't really get deleted by an `is_deleted` column is turned to `True`). We don't do that here to keep the query a little more readable, but feel free to see examples of this in our [docs](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store-athena-glue-integration.html).

### Prepare training data for Ranking model

The only transformation we'll need to do for our ranking model data is onehot-encode the product categories.

In [None]:
df_rank_features = pd.concat([df_rank_features, pd.get_dummies(df_rank_features['product_category'], prefix='prod_cat')], axis=1)
del df_rank_features['product_category']

In [None]:
df_rank_features.head()

Now let's split our data into training and validation sets and save to disk.

In [None]:
train_data, validation_data, _ = np.split(df_rank_features.sample(frac=1, random_state=1729), [int(0.7 * len(df_rank_features)), int(0.9 * len(df_rank_features))])
train_data.to_csv('train.csv', header=False, index=False)
validation_data.to_csv('validation.csv', header=False, index=False)

Now upload those datasets to S3 and prepare our training and validation inputs.

In [None]:
boto3.Session().resource('s3').Bucket(default_bucket).Object(os.path.join(prefix, 'train/train.csv')).upload_file('train.csv')
boto3.Session().resource('s3').Bucket(default_bucket).Object(os.path.join(prefix, 'validation/validation.csv')).upload_file('validation.csv')
s3_input_train = TrainingInput(s3_data='s3://{}/{}/train/train.csv'.format(default_bucket, prefix), content_type='csv')
s3_input_validation = TrainingInput(s3_data='s3://{}/{}/validation/validation.csv'.format(default_bucket, prefix), content_type='csv')

### Train Ranking model

Our ranking model will be an XGBoost model. It will rerank the recommended products from the collaborative filtering model by taking the user's click-stream activity and using that to make personalized recommendations.

<img src="./img/ranking-inputs.png" alt="Ranking model inputs" style="width: 500px;"/>

We'll be predicting `bought` which is a boolean variable that indicates whether a user bought an item or not.

In [None]:
container = sagemaker.image_uris.retrieve('xgboost', region, version='1.2-2')

xgb = sagemaker.estimator.Estimator(container,
                                    role, 
                                    instance_count=1, 
                                    instance_type='ml.m4.xlarge',
                                    output_path='s3://{}/{}/output'.format(default_bucket, prefix),
                                    sagemaker_session=sagemaker_session)

xgb.set_hyperparameters(
    max_depth= 5,
    eta= 0.2,
    gamma= 4,
    min_child_weight= 6,
    subsample= 0.7,
    objective= 'binary:logistic',
    num_round= 50,
    verbosity= 2
)

xgb.fit({'train': s3_input_train, 'validation': s3_input_validation})

### Deploy Ranking model

Now that we've trained our ranking model, let's deploy it as a real-time endpoint!

In [None]:
xgb_predictor = xgb.deploy(
    endpoint_name = ranking_model_endpoint_name,
    initial_instance_count = 1,
    instance_type = 'ml.m4.xlarge',
    serializer = CSVSerializer(),
    wait=False
)

In [None]:
xgb_predictor.endpoint_name

### Save CF inference data

In [None]:
def top_rated_products_by_customer_state(customer_id, top_n):
    # Sample some records to be used for inference
    # Sample by top rated products in State
    record = featurestore_runtime.get_record(FeatureGroupName=customers_feature_group_name,
                                             RecordIdentifierValueAsString=customer_id,
                                             FeatureNames=['state', 'is_married', 'age'])
    # Parse through record features
    other_customer_features = {}
    for feature in record['Record']:
        other_customer_features[feature['FeatureName']] = feature['ValueAsString']
        
    # Get state
    state = other_customer_features['state']
    # Filter DF by state
    df_cf_features_by_state = df_cf_features[df_cf_features['state'] == state]
    
    # Get top rated products by customer's state
    popular_items = df_cf_features_by_state.groupby(["product_id", "product_name"])['rating'].agg('mean').sort_values(ascending=False).reset_index()
    for k, v in other_customer_features.items():
        popular_items[k] = v
    popular_items['customer_id'] = customer_id
    top_n_popular_items = popular_items.iloc[0:top_n]
    top_n_popular_items = top_n_popular_items[df_cf_features.columns]
    del top_n_popular_items['rating']
    return top_n_popular_items

To address the cold-start problem (if a customer has yet to purchase any items), we'll fetch the top-rated products in a given customer's state. We'll then transform this data (like we did with the collaborative filtering model's training data), and use it at the time of inference.

In [None]:
customer_id = 'C3571'
cf_inference_df = top_rated_products_by_customer_state(customer_id, 15)
cf_inference_payload = transform_cf_data(df_cf_features, cf_inference_df).toarray()

ps.add({'inference_customer_id': customer_id})

# Save cf_inference_payload for next notebook
%store cf_inference_payload
%store cf_inference_df
ps.store()

Go back to Workshop Studio and click on "Next".