{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# CPG Industry - Amazon Personalization Sample\n", "\n", "This notebook provides the code used on: \" Hydrating your Amazon Personalize datasets with contextual information from the AWS Data Exchange\" Blog Post to explore how weather information context can be used to enrich models and improve the relevance of recomendatitons generated by Amazon Personalize. \n", "\n", "Recommended Time: 90 Min" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup\n", "\n", "To get started, we need to perform a bit of setup. Walk through each of the following steps to configure your environment to interact with the Amazon Personalize Service." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import Dependencies and Setup Boto3 Python Clients\n", "\n", "Throughout this workshop we will need access to some common libraries and clients for connecting to AWS services. We also have to retrieve Uid from a SageMaker notebook instance tag." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import Dependencies\n", "\n", "import boto3\n", "import json\n", "import pandas as pd\n", "import numpy as np\n", "import seaborn as sns\n", "import matplotlib.pyplot as plt\n", "import time\n", "import requests\n", "import csv\n", "import sys\n", "import botocore\n", "import uuid\n", "\n", "from datetime import datetime\n", "from datetime import date\n", "from packaging import version\n", "from random import randint\n", "from botocore.exceptions import ClientError\n", "\n", "\n", "%matplotlib inline\n", "\n", "# Setup Clients\n", "\n", "personalize = boto3.client('personalize')\n", "personalize_runtime = boto3.client('personalize-runtime')\n", "personalize_events = boto3.client('personalize-events')\n", "s3 = boto3.client('s3')\n", "\n", "with open('/opt/ml/metadata/resource-metadata.json') as f:\n", " data = json.load(f)\n", "sagemaker = boto3.client('sagemaker')\n", "sagemakerResponce = sagemaker.list_tags(ResourceArn=data[\"ResourceArn\"])\n", "for tag in sagemakerResponce[\"Tags\"]:\n", " if tag['Key'] == 'Uid':\n", " Uid = tag['Value']\n", " break" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Implement some visualization functions for displaying information of the products in a dataframe\n", "\n", "Throughout this workshop we will need to search information of products several times, this function will help us to do it without repeating the same code." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "def search_items_in_dataframe(item_list):\n", " df = pd.DataFrame() \n", " for x in range(len(item_list)):\n", " temp = products_dataset_df.loc[products_dataset_df['ITEM_ID'] == int(item_list[x]['itemId'])]\n", " df = df.append(temp, ignore_index=True)\n", " pd.set_option('display.max_rows', 10)\n", " return df\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Configure Bucket and Data Output Location" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will be configuring some variables that will store the location of our source data. Substitute the name of the bucket we will create later with your own. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "bucket = \"cpg-personalize-weather\" # Use your own bucket\n", "items_filename = \"items.csv\" # Do Not Change\n", "users_filename = \"users.csv\" # Do Not Change\n", "interactions_filename = \"interactions.csv\" # Do Not Change" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get, Prepare, and Upload User, Product, and Interaction Data\n", "\n", "First we need to create a bucket to store the datasets for Personalize to consume them. \n", "\n", "Let's get started." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3.create_bucket(Bucket=bucket)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Download and Explore and clean the Weather Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "weather_df = pd.read_csv('/home/ec2-user/SageMaker/Weather/weather_data.csv')\n", "pd.set_option('display.max_rows', 5)\n", "weather_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Create a new DF with all the values related to Santiago de Chile -> Code ST650\n", "weather_df= weather_df[weather_df.location_id == 'ST650']\n", "weather_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#Helper function to find the temperature in weather dataset and return a categorical attribute for temperature\n", "\n", "def find_weather_data_by_timestamp(timestamp):\n", " date_ds = str(date.fromtimestamp(timestamp)).replace('-', '')\n", " data = weather_df.loc[weather_df['date'] == int(date_ds)]\n", " daily_temp = float (data['avg_temp'])\n", " \n", " if daily_temp < 5:\n", " return 'very cold'\n", " elif daily_temp >=5 and daily_temp < 10:\n", " return 'cold'\n", " elif daily_temp >=10 and daily_temp < 15:\n", " return 'slightly cold'\n", " elif daily_temp >= 15 and daily_temp < 21:\n", " return 'lukewarm'\n", " elif daily_temp >= 21 and daily_temp < 28:\n", " return 'hot'\n", " else:\n", " return 'very hot'" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#test a sample timestamp\n", "find_weather_data_by_timestamp(1587846532)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Download and Explore and clean the Products Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_df = pd.read_csv('./items-origin.csv')\n", "pd.set_option('display.max_rows', 5)\n", "products_df\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean the product dataset and drop columms we don't need.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Prepare products Data\n", "\n", "When training models in Amazon Personalize, we can provide meta data about our items. For this workshop we will add each product's category and style to the item dataset. The product's unique identifier is required. Then we will rename the columns in our dataset to match our schema (defined later) and those expected by Personalize. Finally, we will save our dataset as a CSV and copy it to our S3 bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_dataset_df = products_df[['id','category','type', 'size']]\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_dataset_df['category'].unique()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_dataset_df = products_dataset_df.rename(columns = {'id':'ITEM_ID','category':'CATEGORY','type':'TYPE', 'size':'SIZE'}) \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "pd.set_option('display.max_rows', 5)\n", "products_dataset_df" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_dataset_df.to_csv(items_filename, index=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Download and Explore the Users Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "users_df = pd.read_csv('./users-origin.csv')\n", "pd.set_option('display.max_rows', 5)\n", "users_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Prepare products Data\n", "\n", "Similar to the items dataset we created above, we can provide metadata on our users when training models in Amazon Personalize. For this workshop we will include each user's id and persona. As before, we will name the columns to match our schema, save the data as a CSV, and upload to our S3 bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "users_dataset_df = users_df[['id','persona']]\n", "users_dataset_df = users_dataset_df.rename(columns = {'id':'USER_ID','persona':'PERSONA'}) \n", "users_dataset_df.head(5)\n", "\n", "users_dataset_df.to_csv(users_filename, index=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "users_dataset_df.head(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_dataset_df.info()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create User-Items Interactions Dataset\n", "\n", "To mimic user behavior, we will be generating a new dataset that represents user interactions with items. To make the interactions more realistic, we will use a predefined shopper persona for each user to generate event types for products matching that persona. This persona is composed by 3 categories, separated by the symbol \"_\". \n", "The upsampling process will create events for viewing products, add products to a cart, checking out, and completing orders." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "# Minimum number of interactions to generate\n", "min_interactions = 500000\n", "\n", "# Percentages of each event type to generate\n", "product_added_percent = .08\n", "cart_viewed_percent = .05\n", "checkout_started_percent = .02\n", "order_completed_percent = .01\n", "\n", "# Count of interactions generated for each event type\n", "product_viewed_count = 0\n", "product_added_count = 0\n", "cart_viewed_count = 0\n", "checkout_started_count = 0\n", "order_completed_count = 0\n", "\n", "# How many days in the past (from initial date) to start generating interactions\n", "days_back = 90\n", "\n", "#selecting a start time between 2020/02/23 and 2020/10/22 to match the weather data from the sample\n", "date_time_obj = datetime.strptime('2020-06-25 09:27:53', '%Y-%m-%d %H:%M:%S')\n", "start_time = int(datetime.timestamp(date_time_obj))\n", "#start_time = int(time.time())\n", "\n", "\n", "next_timestamp = start_time - (days_back * 24 * 60 * 60)\n", "seconds_increment = int((start_time - next_timestamp) / min_interactions)\n", "next_update = start_time + 60\n", "\n", "assert seconds_increment > 0, \"Increase days_back or reduce min_interactions\"\n", "\n", "print('Minimum interactions to generate: {}'.format(min_interactions))\n", "print('Days back: {}'.format(days_back))\n", "print('Starting timestamp: {} ({})'.format(next_timestamp, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_timestamp))))\n", "print('Seconds increment: {}'.format(seconds_increment))\n", "\n", "print(\"Generating interactions... (this may take a few minutes)\")\n", "interactions = 0\n", "\n", "subsets_cache = {}\n", "\n", "with open(interactions_filename, 'w') as outfile:\n", " f = csv.writer(outfile)\n", " f.writerow([\"ITEM_ID\", \"USER_ID\", \"EVENT_TYPE\", \"TIMESTAMP\", \"DAILY_TEMPERATURE\"])\n", "\n", " while interactions < min_interactions:\n", " #if (time.time() > next_update):\n", " # rate = interactions / (time.time() - start_time)\n", " # to_go = (min_interactions - interactions) / rate\n", " # print('Generated {} interactions so far ({:0.2f} seconds to go)'.format(interactions, to_go))\n", " #next_update += 60\n", "\n", " # Pick a random user\n", " user = users_df.sample().iloc[0]\n", "\n", " # Determine category affinity from user's persona\n", " persona = user['persona']\n", " preferred_categories = persona.split('_')\n", " \n", "\n", " # Select category based on weighted preference of category order.\n", " category = np.random.choice(preferred_categories, 1, p=[0.6, 0.25, 0.15])[0]\n", "\n", " gender = user['gender']\n", "\n", " # Check if subset data frame is already cached for category & gender\n", " prods_subset_df = subsets_cache.get(category + gender)\n", " if prods_subset_df is None:\n", " # Select products from selected category without gender affinity or that match user's gender\n", " prods_subset_df = products_df.loc[(products_df['category'] == category) & ((products_df['gender_affinity'] == gender) | (products_df['gender_affinity'].isnull()))]\n", " # Update cache\n", " subsets_cache[category + gender] = prods_subset_df\n", "\n", " # Pick a random product from gender filtered subset\n", " product = prods_subset_df.sample().iloc[0]\n", "\n", " this_timestamp = next_timestamp + randint(0, seconds_increment)\n", " daily_temp = find_weather_data_by_timestamp(this_timestamp)\n", " \n", " f.writerow([product['id'],\n", " user['id'], \n", " 'ProductViewed',\n", " this_timestamp,\n", " daily_temp])\n", "\n", " next_timestamp += seconds_increment\n", " product_viewed_count += 1\n", " interactions += 1\n", "\n", " if product_added_count < int(product_viewed_count * product_added_percent):\n", " this_timestamp += randint(0, int(seconds_increment / 2))\n", " daily_temp = find_weather_data_by_timestamp(this_timestamp)\n", " f.writerow([product['id'],\n", " user['id'], \n", " 'ProductAdded',\n", " this_timestamp,\n", " daily_temp])\n", " interactions += 1\n", " product_added_count += 1\n", "\n", " if cart_viewed_count < int(product_viewed_count * cart_viewed_percent):\n", " this_timestamp += randint(0, int(seconds_increment / 2))\n", " daily_temp = find_weather_data_by_timestamp(this_timestamp)\n", " f.writerow([product['id'],\n", " user['id'], \n", " 'CartViewed',\n", " this_timestamp,\n", " daily_temp])\n", " interactions += 1\n", " cart_viewed_count += 1\n", "\n", " if checkout_started_count < int(product_viewed_count * checkout_started_percent):\n", " this_timestamp += randint(0, int(seconds_increment / 2))\n", " daily_temp = find_weather_data_by_timestamp(this_timestamp)\n", " f.writerow([product['id'],\n", " user['id'], \n", " 'CheckoutStarted',\n", " this_timestamp,\n", " daily_temp])\n", " interactions += 1\n", " checkout_started_count += 1\n", "\n", " if order_completed_count < int(product_viewed_count * order_completed_percent):\n", " this_timestamp += randint(0, int(seconds_increment / 2))\n", " daily_temp = find_weather_data_by_timestamp(this_timestamp)\n", " f.writerow([product['id'],\n", " user['id'], \n", " 'OrderCompleted',\n", " this_timestamp,\n", " daily_temp])\n", " interactions += 1\n", " order_completed_count += 1\n", " \n", "print(\"Done\")\n", "print(\"Total interactions: \" + str(interactions))\n", "print(\"Total product viewed: \" + str(product_viewed_count))\n", "print(\"Total product added: \" + str(product_added_count))\n", "print(\"Total cart viewed: \" + str(cart_viewed_count))\n", "print(\"Total checkout started: \" + str(checkout_started_count))\n", "print(\"Total order completed: \" + str(order_completed_count))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Open and Explore the Interactions Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_df = pd.read_csv(interactions_filename)\n", "interactions_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Chart the counts of each `EVENT_TYPE` generated for the interactions dataset. We're simulating a site where visitors heavily view/browse products and to a lesser degree add products to their cart and checkout." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "categorical_attributes = interactions_df.select_dtypes(include = ['object'])\n", "\n", "plt.figure(figsize=(16,3))\n", "sns.countplot(data = categorical_attributes, x = 'EVENT_TYPE')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Take note of the DAILY_TEMPERATURE values included in our interaction dataset, you will be using them for getting recomendations. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_df['DAILY_TEMPERATURE'].unique()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Chart the counts of each `DAILY_TEMPERATURE` generated for the interactions dataset. Check how the temperature is changing during the seasonality of the sample interactions." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "sns.countplot(data = categorical_attributes, x = 'DAILY_TEMPERATURE')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Upload Data\n", "Now we will upload the data we prepared to S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "boto3.Session().resource('s3').Bucket(bucket).Object(interactions_filename).upload_file(interactions_filename)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "boto3.Session().resource('s3').Bucket(bucket).Object(items_filename).upload_file(items_filename)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "boto3.Session().resource('s3').Bucket(bucket).Object(users_filename).upload_file(users_filename)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Configure Amazon Personalize\n", "\n", "Now that we've prepared our three datasets and uploaded them to S3 we'll need to configure the Amazon Personalize service to understand our data so that it can be used to train models for generating recommendations." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Schemas for Datasets\n", "\n", "Amazon Personalize requires a schema for each dataset so it can map the columns in our CSVs to fields for model training. Each schema is declared in JSON using the [Apache Avro](https://avro.apache.org/) format.\n", "\n", "Let's define and create schemas in Personalize for our datasets." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Items Datsaset Schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "items_schema = {\n", " \"type\": \"record\",\n", " \"name\": \"Items\",\n", " \"namespace\": \"com.amazonaws.personalize.schema\",\n", " \"fields\": [\n", " {\n", " \"name\": \"ITEM_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"CATEGORY\",\n", " \"type\": \"string\",\n", " \"categorical\": True,\n", " },\n", " {\n", " \"name\": \"TYPE\",\n", " \"type\": \"string\",\n", " \"categorical\": True,\n", " },\n", " {\n", " \"name\": \"SIZE\",\n", " \"type\": \"string\",\n", " \"categorical\": True,\n", " }\n", " ],\n", " \"version\": \"1.0\"\n", "}\n", "\n", "create_schema_response = personalize.create_schema(\n", " name = \"cpg-weather-schema-items\",\n", " schema = json.dumps(items_schema)\n", ")\n", "\n", "items_schema_arn = create_schema_response['schemaArn']\n", "print(json.dumps(create_schema_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Users Dataset Schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "users_schema = {\n", " \"type\": \"record\",\n", " \"name\": \"Users\",\n", " \"namespace\": \"com.amazonaws.personalize.schema\",\n", " \"fields\": [\n", " {\n", " \"name\": \"USER_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"PERSONA\",\n", " \"type\": \"string\",\n", " \"categorical\": True\n", " }\n", " ],\n", " \"version\": \"1.0\"\n", "}\n", "\n", "create_schema_response = personalize.create_schema(\n", " name = \"cpg-weather-users\",\n", " schema = json.dumps(users_schema)\n", ")\n", "\n", "users_schema_arn = create_schema_response['schemaArn']\n", "print(json.dumps(create_schema_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Interactions Dataset Schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_schema = {\n", " \"type\": \"record\",\n", " \"name\": \"Interactions\",\n", " \"namespace\": \"com.amazonaws.personalize.schema\",\n", " \"fields\": [\n", " {\n", " \"name\": \"ITEM_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"USER_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"EVENT_TYPE\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"TIMESTAMP\",\n", " \"type\": \"long\"\n", " },\n", " {\n", " \"name\": \"DAILY_TEMPERATURE\",\n", " \"type\": \"string\",\n", " \"categorical\": True\n", " }\n", " ],\n", " \"version\": \"1.0\"\n", "}\n", "\n", "create_schema_response = personalize.create_schema(\n", " name = \"cpg-weather-interactions\",\n", " schema = json.dumps(interactions_schema)\n", ")\n", "\n", "interactions_schema_arn = create_schema_response['schemaArn']\n", "print(json.dumps(create_schema_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create and Wait for Dataset Group\n", "\n", "Next we need to create the dataset group that will contain our three datasets." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Dataset Group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_dataset_group_response = personalize.create_dataset_group(\n", " name = 'cgp-weather-dataset'\n", ")\n", "dataset_group_arn = create_dataset_group_response['datasetGroupArn']\n", "print(json.dumps(create_dataset_group_response, indent=2))\n", "\n", "print(f'DatasetGroupArn = {dataset_group_arn}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Wait for Dataset Group to Have ACTIVE Status" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = None\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " describe_dataset_group_response = personalize.describe_dataset_group(\n", " datasetGroupArn = dataset_group_arn\n", " )\n", " status = describe_dataset_group_response[\"datasetGroup\"][\"status\"]\n", " print(\"DatasetGroup: {}\".format(status))\n", " \n", " if status == \"ACTIVE\" or status == \"CREATE FAILED\":\n", " break\n", " \n", " time.sleep(15)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Items Dataset\n", "\n", "Next we will create the datasets in Personalize for our three dataset types. Let's start with the items dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dataset_type = \"ITEMS\"\n", "create_dataset_response = personalize.create_dataset(\n", " name = \"cpg-weather-dataset-items\",\n", " datasetType = dataset_type,\n", " datasetGroupArn = dataset_group_arn,\n", " schemaArn = items_schema_arn\n", ")\n", "\n", "items_dataset_arn = create_dataset_response['datasetArn']\n", "print(json.dumps(create_dataset_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Users Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dataset_type = \"USERS\"\n", "create_dataset_response = personalize.create_dataset(\n", " name = \"cpg-weather-dataset-users\",\n", " datasetType = dataset_type,\n", " datasetGroupArn = dataset_group_arn,\n", " schemaArn = users_schema_arn\n", ")\n", "\n", "users_dataset_arn = create_dataset_response['datasetArn']\n", "print(json.dumps(create_dataset_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Interactions Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dataset_type = \"INTERACTIONS\"\n", "create_dataset_response = personalize.create_dataset(\n", " name = \"cpg-weather-dataset-interactions\",\n", " datasetType = dataset_type,\n", " datasetGroupArn = dataset_group_arn,\n", " schemaArn = interactions_schema_arn\n", ")\n", "\n", "interactions_dataset_arn = create_dataset_response['datasetArn']\n", "print(json.dumps(create_dataset_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import Datasets to Personalize\n", "\n", "Up to this point we have generated CSVs containing data for our users, items, and interactions and staged them in an S3 bucket. We also created schemas in Personalize that define the columns in our CSVs. Then we created a datset group and three datasets in Personalize that will receive our data. In the following steps we will create import jobs with Personalize that will import the datasets from our S3 bucket into the service.\n", "\n", "### Setup Permissions\n", "\n", "By default, the Personalize service does not have permission to acccess the data we uploaded into the S3 bucket in our account. In order to grant access to the Personalize service to read our CSVs, we need to set a Bucket Policy and create an IAM role that the Amazon Personalize service will assume." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Attach policy to S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3 = boto3.client(\"s3\")\n", "\n", "policy = {\n", " \"Version\": \"2012-10-17\",\n", " \"Id\": \"PersonalizeS3BucketAccessPolicy\",\n", " \"Statement\": [\n", " {\n", " \"Sid\": \"PersonalizeS3BucketAccessPolicy\",\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"personalize.amazonaws.com\"\n", " },\n", " \"Action\": [\n", " \"s3:GetObject\",\n", " \"s3:ListBucket\"\n", " ],\n", " \"Resource\": [\n", " \"arn:aws:s3:::{}\".format(bucket),\n", " \"arn:aws:s3:::{}/*\".format(bucket)\n", " ]\n", " }\n", " ]\n", "}\n", "\n", "s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy));" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create S3 Read Only Access Role" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iam = boto3.client(\"iam\")\n", "\n", "role_name = 'CPG'+\"-PersonalizeS3\"\n", "assume_role_policy_document = {\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"personalize.amazonaws.com\"\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }\n", " ]\n", "}\n", "\n", "create_role_response = iam.create_role(\n", " RoleName = role_name,\n", " AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)\n", ");\n", "\n", "iam.attach_role_policy(\n", " RoleName = role_name,\n", " PolicyArn = \"arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess\"\n", ");\n", "\n", "role_arn = create_role_response[\"Role\"][\"Arn\"]\n", "print('IAM Role: {}'.format(role_arn))\n", "# Pause to allow role to fully persist\n", "time.sleep(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Import Jobs\n", "\n", "With the permissions in place to allow Personalize to access our CSV files, let's create three import jobs to import each file into its respective dataset. Each import job can take several minutes to complete so we'll create all three and then wait for them all to complete." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Items Dataset Import Job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "items_create_dataset_import_job_response = personalize.create_dataset_import_job(\n", " jobName = \"cpg-weather-dataset-items-import-job\",\n", " datasetArn = items_dataset_arn,\n", " dataSource = {\n", " \"dataLocation\": \"s3://{}/{}\".format(bucket, items_filename)\n", " },\n", " roleArn = role_arn\n", ")\n", "\n", "items_dataset_import_job_arn = items_create_dataset_import_job_response['datasetImportJobArn']\n", "print(json.dumps(items_create_dataset_import_job_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Users Dataset Import Job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "users_create_dataset_import_job_response = personalize.create_dataset_import_job(\n", " jobName = \"cpg-weather-dataset-users-import-job\",\n", " datasetArn = users_dataset_arn,\n", " dataSource = {\n", " \"dataLocation\": \"s3://{}/{}\".format(bucket, users_filename)\n", " },\n", " roleArn = role_arn\n", ")\n", "\n", "users_dataset_import_job_arn = users_create_dataset_import_job_response['datasetImportJobArn']\n", "print(json.dumps(users_create_dataset_import_job_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Interactions Dataset Import Job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_create_dataset_import_job_response = personalize.create_dataset_import_job(\n", " jobName = \"cpg-weather-dataset-interactions-import-job\",\n", " datasetArn = interactions_dataset_arn,\n", " dataSource = {\n", " \"dataLocation\": \"s3://{}/{}\".format(bucket, interactions_filename)\n", " },\n", " roleArn = role_arn\n", ")\n", "\n", "interactions_dataset_import_job_arn = interactions_create_dataset_import_job_response['datasetImportJobArn']\n", "print(json.dumps(interactions_create_dataset_import_job_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Wait for Import Jobs to Complete\n", "\n", "It will take 10-15 minutes for the import jobs to complete, while you're waiting you can learn more about Datasets and Schemas here: https://docs.aws.amazon.com/personalize/latest/dg/how-it-works-dataset-schema.html\n", "\n", "We will wait for all three jobs to finish." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Wait for Items Import Job to Complete" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "import_job_arns = [ items_dataset_import_job_arn, users_dataset_import_job_arn, interactions_dataset_import_job_arn ]\n", "\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " for job_arn in reversed(import_job_arns):\n", " import_job_response = personalize.describe_dataset_import_job(\n", " datasetImportJobArn = job_arn\n", " )\n", " status = import_job_response[\"datasetImportJob\"]['status']\n", "\n", " if status == \"ACTIVE\":\n", " print(f'Import job {job_arn} successfully completed')\n", " import_job_arns.remove(job_arn)\n", " elif status == \"CREATE FAILED\":\n", " print(f'Import job {job_arn} failed')\n", " if import_job_response.get('failureReason'):\n", " print(' Reason: ' + import_job_response['failureReason'])\n", " import_job_arns.remove(job_arn)\n", "\n", " if len(import_job_arns) > 0:\n", " print('At least one dataset import job still in progress')\n", " time.sleep(60)\n", " else:\n", " print(\"All import jobs have ended\")\n", " break" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Solutions\n", "\n", "With our three datasets imported into our dataset group, we can now turn to training models. \n", "When creating a solution, you provide your dataset group and the recipe for training. Let's declare the recipes that we will need for our solutions." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### List Recipes\n", "\n", "First, let's list all available recipes." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "list_recipes_response = personalize.list_recipes()\n", "list_recipes_response" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "As you can see above, there are several recipes to choose from. Let's use only the Product Recommendations" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Declare Personalize Recipe for Product Recommendations\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "recommend_recipe_arn = \"arn:aws:personalize:::recipe/aws-user-personalization\"" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Solutions and Solution Versions\n", "\n", "With our recipes defined, we can now create our solutions and solution versions." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Product Recommendation Solution" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_solution_response = personalize.create_solution(\n", " name = \"cpg-weather-product-personalization\",\n", " datasetGroupArn = dataset_group_arn,\n", " recipeArn = recommend_recipe_arn\n", ")\n", "\n", "recommend_solution_arn = create_solution_response['solutionArn']\n", "print(json.dumps(create_solution_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Product Recommendation Solution Version" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_solution_version_response = personalize.create_solution_version(\n", " solutionArn = recommend_solution_arn\n", ")\n", "\n", "recommend_solution_version_arn = create_solution_version_response['solutionVersionArn']\n", "print(json.dumps(create_solution_version_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Wait for Solution Versions to Complete\n", "\n", "It can take 40-60 minutes for all solution versions to be created. During this process a model is being trained and tested with the data contained within your datasets. The duration of training jobs can increase based on the size of the dataset, training parameters and using AutoML vs. manually selecting a recipe. We submitted requests for all three solutions and versions at once so they are trained in parallel and then below we will wait for all three to finish.\n", "\n", "While you are waiting for this process to complete you can learn more about solutions here: https://docs.aws.amazon.com/personalize/latest/dg/training-deploying-solutions.html" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Wait for Related Products Solution Version to Have ACTIVE Status" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " soln_ver_response = personalize.describe_solution_version(\n", " solutionVersionArn = recommend_solution_version_arn\n", " )\n", " status = soln_ver_response[\"solutionVersion\"][\"status\"]\n", " time.sleep(10)\n", " print(status)\n", " if status == \"ACTIVE\":\n", " print(f'Solution version {recommend_solution_version_arn} successfully completed')\n", " break\n", " elif status == \"CREATE FAILED\":\n", " print(f'Solution version {soln_ver_arn} failed')\n", " if soln_ver_response.get('failureReason'):\n", " print(' Reason: ' + soln_ver_response['failureReason'])\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Evaluate Offline Metrics for Solution Versions\n", "\n", "Amazon Personalize provides [offline metrics](https://docs.aws.amazon.com/personalize/latest/dg/working-with-training-metrics.html#working-with-training-metrics-metrics) that allow you to evaluate the performance of the solution version before you deploy the model in your application. Metrics can also be used to view the effects of modifying a Solution's hyperparameters or to compare the metrics between solutions that use the same training data but created with different recipes.\n", "\n", "Let's retrieve the metrics for the solution versions we just created." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Product Recommendations Metrics" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "get_solution_metrics_response = personalize.get_solution_metrics(\n", " solutionVersionArn = recommend_solution_version_arn\n", ")\n", "\n", "print(json.dumps(get_solution_metrics_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create Campaigns\n", "\n", "Once we're satisfied with our solution versions, we need to create Campaigns for each solution version. When creating a campaign you specify the minimum transactions per second (`minProvisionedTPS`) that you expect to make against the service for this campaign. Personalize will automatically scale the inference endpoint up and down for the campaign to match demand but will never scale below `minProvisionedTPS`.\n", "\n", "Let's create campaigns for our three solution versions with each set at `minProvisionedTPS` of 1." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Product Recommendation Campaign" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_campaign_response = personalize.create_campaign(\n", " name = \"cpg-weather-product-personalization\",\n", " solutionVersionArn = recommend_solution_version_arn,\n", " minProvisionedTPS = 1\n", ")\n", "\n", "recommend_campaign_arn = create_campaign_response['campaignArn']\n", "print(json.dumps(create_campaign_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Wait for Related Products Campaign to Have ACTIVE Status\n", "\n", "It can take 20-30 minutes for the campaigns to be fully created. \n", "\n", "While you are waiting for this to complete you can learn more about campaigns here: https://docs.aws.amazon.com/personalize/latest/dg/campaigns.html" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " campaign_response = personalize.describe_campaign(\n", " campaignArn = recommend_campaign_arn\n", " )\n", " status = campaign_response[\"campaign\"][\"status\"]\n", " time.sleep(10)\n", " print(status)\n", "\n", " if status == \"ACTIVE\":\n", " print(f'Campaign {recommend_campaign_arn} successfully completed')\n", " break\n", " elif status == \"CREATE FAILED\":\n", " print(f'Campaign {campaign_arn} failed')\n", " if campaign_response.get('failureReason'):\n", " print(' Reason: ' + campaign_response['failureReason'])" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Test Campaigns\n", "\n", "Now that our campaigns have been fully created, let's test each campaign and evaluate the results." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Test Product Recommendations Campaign\n", "\n", "Let's test the recommendations made by the product recommendations campaign by selecting a user from the users dataset and requesting item recommendations for that user." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Select a User\n", "\n", "We'll just pick a random user for simplicity. Feel free to change the `user_id` below and execute the following cells with a different user to get a sense for how the recommendations change." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# User with interactions 170\n", "# Cold start user 7000\n", "user_id = 7000\n", "users_df.loc[users_df['id'] == user_id]" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Take note of the `persona` value for the user above. We should see recommendations for products consistent with this persona since we generated historical interactions for products in the categories represented in the persona.**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Get Product Recommendations for User\n", "\n", "Now let's call Amazon Personalize to get recommendations for our user from the product recommendations campaign." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "get_recommendations_response = personalize_runtime.get_recommendations(\n", " campaignArn = recommend_campaign_arn,\n", " userId = str(user_id),\n", " numResults = 5)\n", "\n", "item_list = get_recommendations_response['itemList']\n", "print(json.dumps(item_list, indent=4))\n", "search_items_in_dataframe(item_list)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Are the recommended products consistent with the persona? Note that this is a rather contrived example using a limited amount of generated interaction data without model parameter tuning. The purpose is to give you hands on experience building models and retrieving inferences from Amazon Personalize. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Contextual recomendations\n", "\n", "Now lets explore the possibility of passing contextual information to the recomendation call. Context can be any attribute included in the Interactions dataset used to train the solution. in our case we included the average temperature of each day in Santiago de Chile extracted from WeatherTrends360 dataset: https://aws.amazon.com/marketplace/pp/prodview-4htmi6srh7zve?qid=1603293303750&sr=0-10&ref_=srh_res_product_title#overview.\n", "\n", "If you want to try your own solutions feel free to explore other dataset on the AWS Marketplace. \n", "\n", "Other useful contextual informacion can be the device or trade channel used to interact and other similar metadata alike. More information: https://aws.amazon.com/blogs/machine-learning/increasing-the-relevance-of-your-amazon-personalize-recommendations-by-leveraging-contextual-information/\n", "\n", "Lets select a user and test the recomendations for the included temperature ranges.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "user_id = 7000\n", "users_dataset_df.loc[users_dataset_df['USER_ID'] == user_id]\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Recommendations of products in 'hot' days. Feel free to explore and change to other values \n", "# like: 'lukewarm', 'hot', 'slightly cold', 'cold' \n", "\n", "get_recommendations_response = personalize_runtime.get_recommendations(\n", " campaignArn = recommend_campaign_arn,\n", " userId = str(user_id),\n", " numResults = 5,\n", " context = {\n", " 'DAILY_TEMPERATURE': 'hot'\n", " })\n", "\n", "item_list = get_recommendations_response['itemList']\n", "print(json.dumps(item_list, indent=4))\n", "search_items_in_dataframe(item_list)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "The items recommended are different from the previous calls? Try different users, values and amount of items recommended to get a grasp of the behavior. " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Workshop Complete\n", "\n", "Congratulations! You have completed the contextual Weather Personalization Workshop.\n", "\n", "### Cleanup\n", "\n", "You MUST run the cleanup notebook or manually clean up these resources. If using the notebook, save the names of the elements to be cleaned:" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store dataset_group_arn\n", "%store items_dataset_arn\n", "%store users_dataset_arn\n", "%store interactions_dataset_arn\n", "%store role_arn\n", "%store users_dataset_import_job_arn\n", "%store interactions_dataset_import_job_arn\n", "%store items_dataset_import_job_arn\n", "%store recommend_campaign_arn" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_amazonei_mxnet_p36", "language": "python", "name": "conda_amazonei_mxnet_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.10" } }, "nbformat": 4, "nbformat_minor": 4 }