{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Implementing a Recommender System with SageMaker, MXNet, and Gluon\n", "_**Making Video Recommendations Using Neural Networks and Embeddings**_\n", "\n", "--- \n", "\n", "---\n", "\n", "*This work is based on content from the [Cyrus Vahid's 2017 re:Invent Talk](https://github.com/cyrusmvahid/gluontutorials/blob/master/recommendations/MLPMF.ipynb)*\n", "\n", "\n", "## Contents\n", "\n", "1. [Background](#Background)\n", "1. [Setup](#Setup)\n", "1. [Data](#Data)\n", " 1. [Explore](#Explore)\n", " 1. [Clean](#Clean)\n", " 1. [Prepare](#Prepare)\n", "1. [Train Locally](#Train-Locally)\n", " 1. [Define Network](#Define-Network)\n", " 1. [Set Parameters](#Set-Parameters)\n", " 1. [Execute](#Execute)\n", "1. [Train with SageMaker](#Train-with-SageMaker)\n", " 1. [Wrap Code](#Wrap-Code)\n", " 1. [Move Data](#Move-Data)\n", " 1. [Submit](#Submit)\n", "1. [Host](#Host)\n", " 1. [Evaluate](#Evaluate)\n", "1. [Wrap-up](#Wrap-up)\n", "\n", "---\n", "\n", "## Background\n", "\n", "In many ways, recommender systems were a catalyst for the current popularity of machine learning. One of Amazon's earliest successes was the \"Customers who bought this, also bought...\" feature, while the million dollar Netflix Prize spurred research, raised public awareness, and inspired numerous other data science competitions.\n", "\n", "Recommender systems can utilize a multitude of data sources and ML algorithms, and most combine various unsupervised, supervised, and reinforcement learning techniques into a holistic framework. However, the core component is almost always a model which which predicts a user's rating (or purchase) for a certain item based on that user's historical ratings of similar items as well as the behavior of other similar users. The minimal required dataset for this is a history of user item ratings. In our case, we'll use 1 to 5 star ratings from over 2M Amazon customers on over 160K digital videos. More details on this dataset can be found at its [AWS Public Datasets page](https://s3.amazonaws.com/amazon-reviews-pds/readme.html).\n", "\n", "Matrix factorization has been the cornerstone of most user-item prediction models. This method starts with the large, sparse, user-item ratings in a single matrix, where users index the rows, and items index the columns. It then seeks to find two lower-dimensional, dense matrices which, when multiplied together, preserve the information and relationships in the larger matrix.\n", "\n", "![image](https://data-artisans.com/img/blog/factorization.svg)\n", "\n", "Matrix factorization has been extended and genarlized with deep learning and embeddings. These techniques allows us to introduce non-linearities for enhanced performance and flexibility. This notebook will fit a neural network-based model to generate recommendations for the Amazon video dataset. It will start by exploring our data in the notebook and even training a model on a sample of the data. Later we'll expand to the full dataset and fit our model using a SageMaker managed training cluster. We'll then deploy to an endpoint and check our method.\n", "\n", "---\n", "\n", "## Setup\n", "\n", "_This notebook was created and tested on an ml.p2.xlarge notebook instance._\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's load the Python libraries we'll need for the remainder of this example notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import mxnet as mx\n", "from mxnet import gluon, nd, ndarray\n", "from mxnet.metric import MSE\n", "import pandas as pd\n", "import numpy as np\n", "import sagemaker\n", "import boto3\n", "import json\n", "import matplotlib.pyplot as plt" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [ "parameters" ] }, "outputs": [], "source": [ "# Set optimization parameters\n", "opt = 'sgd'\n", "lr = 0.02\n", "momentum = 0.9\n", "wd = 0." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "## Data\n", "\n", "### Explore\n", "\n", "Let's start by bringing in our dataset from an S3 public bucket. As mentioned above, this contains 1 to 5 star ratings from over 2M Amazon customers on over 160K digital videos. More details on this dataset can be found at its [AWS Public Datasets page](https://s3.amazonaws.com/amazon-reviews-pds/readme.html).\n", "\n", "_Note, because this dataset is over a half gigabyte, the load from S3 may take ~10 minutes. Also, since Amazon SageMaker Notebooks start with a 5GB persistent volume by default, and we don't need to keep this data on our instance for long, we'll bring it to the temporary volume (which has up to 20GB of storage)._" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "!mkdir /tmp/recsys/\n", "!aws s3 cp s3://amazon-reviews-pds/tsv/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz /tmp/recsys/" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's read the data into a [Pandas DataFrame](https://pandas.pydata.org/pandas-docs/stable/generated/pandas.DataFrame.html) so that we can begin to understand it.\n", "\n", "*Note, we'll set `error_bad_lines=False` when reading the file in as there appear to be a very small number of records which would create a problem otherwise.*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = pd.read_csv('/tmp/recsys/amazon_reviews_us_Digital_Video_Download_v1_00.tsv.gz', delimiter='\\t',error_bad_lines=False)\n", "df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see this dataset includes information like:\n", "\n", "- `marketplace`: 2-letter country code (in this case all \"US\").\n", "- `customer_id`: Random identifier that can be used to aggregate reviews written by a single author.\n", "- `review_id`: A unique ID for the review.\n", "- `product_id`: The Amazon Standard Identification Number (ASIN). `http://www.amazon.com/dp/` links to the product's detail page.\n", "- `product_parent`: The parent of that ASIN. Multiple ASINs (color or format variations of the same product) can roll up into a single parent parent.\n", "- `product_title`: Title description of the product.\n", "- `product_category`: Broad product category that can be used to group reviews (in this case digital videos).\n", "- `star_rating`: The review's rating (1 to 5 stars).\n", "- `helpful_votes`: Number of helpful votes for the review.\n", "- `total_votes`: Number of total votes the review received.\n", "- `vine`: Was the review written as part of the [Vine](https://www.amazon.com/gp/vine/help) program?\n", "- `verified_purchase`: Was the review from a verified purchase?\n", "- `review_headline`: The title of the review itself.\n", "- `review_body`: The text of the review.\n", "- `review_date`: The date the review was written.\n", "\n", "For this example, let's limit ourselves to `customer_id`, `product_id`, and `star_rating`. Including additional features in our recommendation system could be beneficial, but would require substantial processing (particularly the text data) which would take us beyond the scope of this notebook.\n", "\n", "*Note: we'll keep `product_title` on the dataset to help verify our recommendations later in the notebook, but it will not be used in algorithm training.*" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = df[['customer_id', 'product_id', 'star_rating', 'product_title']]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Because most people haven't seen most videos, and people rate fewer videos than we actually watch, we'd expect our data to be sparse. Our algorithm should work well with this sparse problem in general, but we may still want to clean out some of the long tail. Let's look at some basic percentiles to confirm." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers = df['customer_id'].value_counts()\n", "products = df['product_id'].value_counts()\n", "\n", "quantiles = [0, 0.01, 0.02, 0.03, 0.04, 0.05, 0.1, 0.25, 0.5, 0.75, 0.9, 0.95, 0.96, 0.97, 0.98, 0.99, 1]\n", "print('customers\\n', customers.quantile(quantiles))\n", "print('products\\n', products.quantile(quantiles))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As we can see, only about 5% of customers have rated 5 or more videos, and only 25% of videos have been rated by 9+ customers.\n", "\n", "### Clean\n", "\n", "Let's filter out this long tail." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers = customers[customers >= 5]\n", "products = products[products >= 10]\n", "\n", "reduced_df = df.merge(pd.DataFrame({'customer_id': customers.index})).merge(pd.DataFrame({'product_id': products.index}))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we'll recreate our customer and product lists since there are customers with more than 5 reviews, but all of their reviews are on products with less than 5 reviews (and vice versa)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customers = reduced_df['customer_id'].value_counts()\n", "products = reduced_df['product_id'].value_counts()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Next, we'll number each user and item, giving them their own sequential index. This will allow us to hold the information in a sparse format where the sequential indices indicate the row and column in our ratings matrix." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "customer_index = pd.DataFrame({'customer_id': customers.index, 'user': np.arange(customers.shape[0])})\n", "product_index = pd.DataFrame({'product_id': products.index, \n", " 'item': np.arange(products.shape[0])})\n", "\n", "reduced_df = reduced_df.merge(customer_index).merge(product_index)\n", "reduced_df.head()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Prepare\n", "\n", "Let's start by splitting in training and test sets. This will allow us to estimate the model's accuracy on videos our customers rated, but wasn't included in our training." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "test_df = reduced_df.groupby('customer_id').last().reset_index()\n", "\n", "train_df = reduced_df.merge(test_df[['customer_id', 'product_id']], \n", " on=['customer_id', 'product_id'], \n", " how='outer', \n", " indicator=True)\n", "train_df = train_df[(train_df['_merge'] == 'left_only')]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, we can convert our Pandas DataFrames into MXNet NDArrays, use those to create a member of the SparseMatrixDataset class, and add that to an MXNet Data Iterator. This process is the same for both test and control." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "batch_size = 1024\n", "\n", "train = gluon.data.ArrayDataset(nd.array(train_df['user'].values, dtype=np.float32),\n", " nd.array(train_df['item'].values, dtype=np.float32),\n", " nd.array(train_df['star_rating'].values, dtype=np.float32))\n", "test = gluon.data.ArrayDataset(nd.array(test_df['user'].values, dtype=np.float32),\n", " nd.array(test_df['item'].values, dtype=np.float32),\n", " nd.array(test_df['star_rating'].values, dtype=np.float32))\n", "\n", "train_iter = gluon.data.DataLoader(train, shuffle=True, num_workers=4, batch_size=batch_size, last_batch='rollover')\n", "test_iter = gluon.data.DataLoader(train, shuffle=True, num_workers=4, batch_size=batch_size, last_batch='rollover')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "---\n", "\n", "## Train Locally\n", "\n", "### Define Network\n", "\n", "Let's start by defining the neural network version of our matrix factorization task. In this case, our network is quite simple. The main components are:\n", "- [Embeddings](https://mxnet.incubator.apache.org/api/python/gluon/nn.html#mxnet.gluon.nn.Embedding) which turn our indexes into dense vectors of fixed size. In this case, 64.\n", "- [Dense layers](https://mxnet.incubator.apache.org/api/python/gluon.html#mxnet.gluon.nn.Dense) with ReLU activation. Each dense layer has the same number of units as our number of embeddings. Our ReLU activation here also adds some non-linearity to our matrix factorization.\n", "- [Dropout layers](https://mxnet.incubator.apache.org/api/python/gluon.html#mxnet.gluon.nn.Dropout) which can be used to prevent over-fitting.\n", "- Matrix multiplication of our user matrix and our item matrix to create an estimate of our rating matrix." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "class MFBlock(gluon.HybridBlock):\n", " def __init__(self, max_users, max_items, num_emb, dropout_p=0.5):\n", " super(MFBlock, self).__init__()\n", " \n", " self.max_users = max_users\n", " self.max_items = max_items\n", " self.dropout_p = dropout_p\n", " self.num_emb = num_emb\n", " \n", " with self.name_scope():\n", " self.user_embeddings = gluon.nn.Embedding(max_users, num_emb)\n", " self.item_embeddings = gluon.nn.Embedding(max_items, num_emb)\n", " \n", " self.dropout_user = gluon.nn.Dropout(dropout_p)\n", " self.dropout_item = gluon.nn.Dropout(dropout_p)\n", "\n", " self.dense_user = gluon.nn.Dense(num_emb, activation='relu')\n", " self.dense_item = gluon.nn.Dense(num_emb, activation='relu')\n", " \n", " def hybrid_forward(self, F, users, items):\n", " a = self.user_embeddings(users)\n", " a = self.dense_user(a)\n", " \n", " b = self.item_embeddings(items)\n", " b = self.dense_item(b)\n", "\n", " predictions = self.dropout_user(a) * self.dropout_item(b) \n", " predictions = F.sum(predictions, axis=1)\n", " return predictions" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "num_embeddings = 64\n", "\n", "net = MFBlock(max_users=customer_index.shape[0], \n", " max_items=product_index.shape[0],\n", " num_emb=num_embeddings,\n", " dropout_p=0.5)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Set Parameters\n", "\n", "Let's initialize network weights and set our optimization parameters." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Initialize network parameters\n", "ctx = mx.gpu()\n", "net.collect_params().initialize(mx.init.Xavier(magnitude=60),\n", " ctx=ctx,\n", " force_reinit=True)\n", "net.hybridize()\n", "\n", "\n", "trainer = gluon.Trainer(net.collect_params(),\n", " opt,\n", " {'learning_rate': lr,\n", " 'wd': wd,\n", " 'momentum': momentum})" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Execute\n", "\n", "Let's define a function to carry out the training of our neural network." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def execute(train_iter, test_iter, net, epochs, ctx):\n", " \n", " loss_function = gluon.loss.L2Loss()\n", " for e in range(epochs):\n", " \n", " print(\"epoch: {}\".format(e))\n", " \n", " for i, (user, item, label) in enumerate(train_iter):\n", " user = user.as_in_context(ctx)\n", " item = item.as_in_context(ctx)\n", " label = label.as_in_context(ctx)\n", " \n", " with mx.autograd.record():\n", " output = net(user, item) \n", " loss = loss_function(output, label)\n", " \n", " loss.backward()\n", " trainer.step(batch_size)\n", "\n", " print(\"EPOCH {}: MSE ON TRAINING and TEST: {}. {}\".format(e,\n", " eval_net(train_iter, net, ctx, loss_function),\n", " eval_net(test_iter, net, ctx, loss_function)))\n", " print(\"end of training\")\n", " return net" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Let's also define a function which evaluates our network on a given dataset. This is called by our `execute` function above to provide mean squared error values on our training and test datasets." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def eval_net(data, net, ctx, loss_function):\n", " acc = MSE()\n", " for i, (user, item, label) in enumerate(data):\n", " \n", " user = user.as_in_context(ctx)\n", " item = item.as_in_context(ctx)\n", " label = label.as_in_context(ctx)\n", " predictions = net(user, item).reshape((batch_size, 1))\n", " acc.update(preds=[predictions], labels=[label])\n", " \n", " return acc.get()[1]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now, let's train for a few epochs." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "epochs = 3\n", "\n", "trained_net = execute(train_iter, test_iter, net, epochs, ctx)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Early Validation\n", "\n", "We can see our training error going down, but our validation accuracy bounces around a bit. Let's check how our model is predicting for an individual user. We could pick randomly, but for this case, let's try user #6." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "product_index['u6_predictions'] = trained_net(nd.array([6] * product_index.shape[0]).as_in_context(ctx), \n", " nd.array(product_index['item'].values).as_in_context(ctx)).asnumpy()\n", "product_index.sort_values('u6_predictions', ascending=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Now let's compare this to the predictions for another user (we'll try user #7)." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "product_index['u7_predictions'] = trained_net(nd.array([7] * product_index.shape[0]).as_in_context(ctx), \n", " nd.array(product_index['item'].values).as_in_context(ctx)).asnumpy()\n", "product_index.sort_values('u7_predictions', ascending=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The predicted ratings are different between the two users, but the same top (and bottom) items for user #6 appear for #7 as well. Let's look at the correlation across the full set of 38K items to see if this relationship holds." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "product_index[['u6_predictions', 'u7_predictions']].plot.scatter('u6_predictions', 'u7_predictions')\n", "plt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see that this correlation is nearly perfect. Essentially the average rating of items dominates across users and we'll recommend the same well-reviewed items to everyone. As it turns out, we can add more embeddings and this relationship will go away since we're better able to capture differential preferences across users.\n", "\n", "However, with just a 64 dimensional embedding, it took 7 minutes to run just 3 epochs. If we ran this outside of our Notebook Instance we could run larger jobs and move on to other work would improve productivity.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "celltoolbar": "Tags", "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.8.1" }, "notice": "Copyright 2017 Amazon.com, Inc. or its affiliates. All Rights Reserved. Licensed under the Apache License, Version 2.0 (the \"License\"). You may not use this file except in compliance with the License. A copy of the License is located at http://aws.amazon.com/apache2.0/ or in the \"license\" file accompanying this file. This file is distributed on an \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License." }, "nbformat": 4, "nbformat_minor": 4 }