{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# CPG Industry - Personalization Workshop\n", "\n", "Welcome to the CPG Industry Personalization Workshop. In this module we're going to be adding three core personalization features powered by [Amazon Personalize](https://aws.amazon.com/personalize/): related product recommendations on the product detail page, personalized recommendations, and personalized ranking of items. This will allow us to give our users targeted recommendations based on their activity.\n", "This workshop reuse a lot of code and behaviour from Retail Demo Store, if you want to expand to explore retail related cases take a look at: https://github.com/aws-samples/retail-demo-store\n", "\n", "Recommended Time: 2 Hours\n", "\n", "**Note**: This notebook is an example of a [Custom Dataset Group and associated resources](https://docs.aws.amazon.com/personalize/latest/dg/custom-dataset-groups.html), please refer to the documentation for more information on [Domain Dataset Groups and Recommenders](https://docs.aws.amazon.com/personalize/latest/dg/domain-dataset-groups.html)." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Setup\n", "\n", "To get started, we need to perform a bit of setup. Walk through each of the following steps to configure your environment to interact with the Amazon Personalize Service." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Import Dependencies and Setup Boto3 Python Clients\n", "\n", "Throughout this workshop we will need access to some common libraries and clients for connecting to AWS services. We also have to retrieve Uid from a SageMaker notebook instance tag." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Import Dependencies\n", "\n", "import boto3\n", "import json\n", "import pandas as pd\n", "import numpy as np\n", "import seaborn as sns\n", "import matplotlib.pyplot as plt\n", "import time\n", "import requests\n", "import csv\n", "import sys\n", "import botocore\n", "import uuid\n", "\n", "from packaging import version\n", "from random import randint\n", "from botocore.exceptions import ClientError\n", "\n", "%matplotlib inline\n", "\n", "# Setup Clients\n", "\n", "personalize = boto3.client('personalize')\n", "personalize_runtime = boto3.client('personalize-runtime')\n", "personalize_events = boto3.client('personalize-events')\n", "s3 = boto3.client('s3')\n", "\n", "with open('/opt/ml/metadata/resource-metadata.json') as f:\n", " data = json.load(f)\n", "sagemaker = boto3.client('sagemaker')\n", "sagemakerResponce = sagemaker.list_tags(ResourceArn=data[\"ResourceArn\"])\n", "for tag in sagemakerResponce[\"Tags\"]:\n", " if tag['Key'] == 'Uid':\n", " Uid = tag['Value']\n", " break" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Configure Bucket and Data Output Location" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We will be configuring some variables that will store the location of our source data. Substitute the name of the bucket we will create later with your own. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "with open('/opt/ml/metadata/resource-metadata.json') as notebook_info:\n", " data = json.load(notebook_info)\n", " resource_arn = data['ResourceArn']\n", " region = resource_arn.split(':')[3]\n", "print(region)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "account_id = boto3.client('sts').get_caller_identity().get('Account')\n", "bucket = account_id + \"-\" + region + \"-\" + \"cpg-personalize-datasets\" # Creating a unique bucket\n", "items_filename = \"items.csv\" # Do Not Change\n", "users_filename = \"users.csv\" # Do Not Change\n", "interactions_filename = \"interactions.csv\" # Do Not Change\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Get, Prepare, and Upload User, Product, and Interaction Data\n", "\n", "Amazon Personalize provides predefined recipes, based on common use cases, for training models. A recipe is a machine learning algorithm that you use with settings, or hyperparameters, and the data you provide to train an Amazon Personalize model. The data you provide to train a model are organized into separate datasets by the type of data being provided. A collection of datasets are organized into a dataset group. The three dataset types supported by Personalize are items, users, and interactions. Depending on the recipe type you choose, a different combination of dataset types are required. For all recipe types, an interactions dataset is required. Interactions represent how users interact with items. For example, viewing a product, watching a video, listening to a recording, or reading an article. For this workshop, we will be using a recipe that supports all three dataset types.\n", "\n", "First we need to create a bucket to store the datasets for Personalize to consume them. \n", "\n", "Let's get started." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "try: \n", " if region == \"us-east-1\":\n", " s3.create_bucket(Bucket=bucket)\n", " else:\n", " s3.create_bucket(\n", " Bucket=bucket,\n", " CreateBucketConfiguration={'LocationConstraint': region}\n", " )\n", "except s3.exceptions.BucketAlreadyOwnedByYou:\n", " print(\"Bucket already exists. Using bucket\", bucket_name)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Download and Explore and clean the Products Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_df = pd.read_csv('../../automation/ml_ops/domain/CPG/data/metadata/items-origin.csv')\n", "pd.set_option('display.max_rows', 5)\n", "products_df\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Clean the product dataset and drop columms we don't need.\n", "\n", "First, business sent us this data but the information in the gender_affinity column is not reliable. We are going to replace all the M and F values for a np.nan to avoid noise on our model. \n", "Also we are going to drop all the columns with non relevant data for us. \n", "Let's get started." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_df['gender_affinity'] = np.nan\n", "products_df = products_df[['id','name','category','type', 'size', 'gender_affinity', 'sugar']]\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# display the new products_df\n", "print(products_df.info())\n", "display(products_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Prepare product Data\n", "\n", "When training models in Amazon Personalize, we can provide meta data about our items. For this workshop we will copy each product's category and style to the item dataset. The product's unique identifier is required. Then we will rename the columns in our dataset to match our schema (defined later) and those expected by Personalize. Finally, we will save our dataset as a CSV and copy it to our S3 bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_dataset_df = products_df[['id','category','type', 'size', 'sugar']]\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_dataset_df = products_dataset_df.rename(columns = {'id':'ITEM_ID','category':'CATEGORY','type':'TYPE', 'size':'SIZE', 'sugar':'SUGAR'}) \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_dataset_df.head(5)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Let us look at unique categories\n", "products_dataset_df['CATEGORY'].unique()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Let us look at unique sugar contents\n", "products_dataset_df['SUGAR'].unique()" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "products_dataset_df.to_csv(items_filename, index=False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Download and Explore the Users Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "users_df = pd.read_csv('../../automation/ml_ops/domain/CPG/data/metadata/users-origin.csv')\n", "pd.set_option('display.max_rows', 5)\n", "users_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Prepare products Data\n", "\n", "Similar to the items dataset we created above, we can provide metadata on our users when training models in Personalize. For this workshop we will include each user's age and gender. As before, we will name the columns to match our schema, save the data as a CSV, and upload to our S3 bucket." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "users_dataset_df = users_df[['id','age','gender']]\n", "users_dataset_df = users_dataset_df.rename(columns = {'id':'USER_ID','age':'AGE','gender':'GENDER'}) \n", "users_dataset_df.head(5)\n", "\n", "users_dataset_df.to_csv(users_filename, index=False)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "users_dataset_df.info()\n", "display(users_dataset_df)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create User-Items Interactions Dataset\n", "\n", "To mimic user behavior, we will be generating a new dataset that represents user interactions with items. To make the interactions more realistic, we will use a predefined shopper persona for each user to generate event types for products matching that persona. This persona is composed by 3 categories, separated by the symbol \"_\". \n", "The upsampling process will create events for viewing products, add products to a cart, checking out, and completing orders." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "# Minimum number of interactions to generate\n", "min_interactions = 500000\n", "\n", "# Percentages of each event type to generate\n", "product_added_percent = .08\n", "cart_viewed_percent = .05\n", "checkout_started_percent = .02\n", "order_completed_percent = .01\n", "\n", "# Count of interactions generated for each event type\n", "product_viewed_count = 0\n", "product_added_count = 0\n", "cart_viewed_count = 0\n", "checkout_started_count = 0\n", "order_completed_count = 0\n", "\n", "# How many days in the past (from now) to start generating interactions\n", "days_back = 90\n", "\n", "start_time = int(time.time())\n", "next_timestamp = start_time - (days_back * 24 * 60 * 60)\n", "seconds_increment = int((start_time - next_timestamp) / min_interactions)\n", "next_update = start_time + 60\n", "\n", "assert seconds_increment > 0, \"Increase days_back or reduce min_interactions\"\n", "\n", "print('Minimum interactions to generate: {}'.format(min_interactions))\n", "print('Days back: {}'.format(days_back))\n", "print('Starting timestamp: {} ({})'.format(next_timestamp, time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(next_timestamp))))\n", "print('Seconds increment: {}'.format(seconds_increment))\n", "\n", "print(\"Generating interactions... (this may take a few minutes)\")\n", "interactions = 0\n", "\n", "subsets_cache = {}\n", "\n", "with open(interactions_filename, 'w') as outfile:\n", " f = csv.writer(outfile)\n", " f.writerow([\"ITEM_ID\", \"USER_ID\", \"EVENT_TYPE\", \"TIMESTAMP\", \"ITEM_SUGAR_LEVEL\"])\n", "\n", " while interactions < min_interactions:\n", " if (time.time() > next_update):\n", " rate = interactions / (time.time() - start_time)\n", " to_go = (min_interactions - interactions) / rate\n", " print('Generated {} interactions so far ({:0.2f} seconds to go)'.format(interactions, to_go))\n", " next_update += 60\n", "\n", " # Pick a random user\n", " user = users_df.sample().iloc[0]\n", "\n", " # Determine category affinity from user's persona\n", " persona = user['persona']\n", " preferred_categories = persona.split('_')\n", " \n", "\n", " # Select category based on weighted preference of category order.\n", " category = np.random.choice(preferred_categories, 1, p=[0.6, 0.25, 0.15])[0]\n", "\n", " gender = user['gender']\n", "\n", " # Check if subset data frame is already cached for category & gender\n", " prods_subset_df = subsets_cache.get(category + gender)\n", " if prods_subset_df is None:\n", " # Select products from selected category without gender affinity or that match user's gender\n", " prods_subset_df = products_df.loc[(products_df['category'] == category) & ((products_df['gender_affinity'] == gender) | (products_df['gender_affinity'].isnull()))]\n", " # Update cache\n", " subsets_cache[category + gender] = prods_subset_df\n", "\n", " # Pick a random product from gender filtered subset\n", " product = prods_subset_df.sample().iloc[0]\n", "\n", " this_timestamp = next_timestamp + randint(0, seconds_increment)\n", "\n", " f.writerow([product['id'],\n", " user['id'], \n", " 'ProductViewed',\n", " this_timestamp,\n", " product['sugar']])\n", "\n", " next_timestamp += seconds_increment\n", " product_viewed_count += 1\n", " interactions += 1\n", "\n", " if product_added_count < int(product_viewed_count * product_added_percent):\n", " this_timestamp += randint(0, int(seconds_increment / 2))\n", " f.writerow([product['id'],\n", " user['id'], \n", " 'ProductAdded',\n", " this_timestamp,\n", " product['sugar']])\n", " interactions += 1\n", " product_added_count += 1\n", "\n", " if cart_viewed_count < int(product_viewed_count * cart_viewed_percent):\n", " this_timestamp += randint(0, int(seconds_increment / 2))\n", " f.writerow([product['id'],\n", " user['id'], \n", " 'CartViewed',\n", " this_timestamp,\n", " product['sugar']])\n", " interactions += 1\n", " cart_viewed_count += 1\n", "\n", " if checkout_started_count < int(product_viewed_count * checkout_started_percent):\n", " this_timestamp += randint(0, int(seconds_increment / 2))\n", " f.writerow([product['id'],\n", " user['id'], \n", " 'CheckoutStarted',\n", " this_timestamp,\n", " product['sugar']])\n", " interactions += 1\n", " checkout_started_count += 1\n", "\n", " if order_completed_count < int(product_viewed_count * order_completed_percent):\n", " this_timestamp += randint(0, int(seconds_increment / 2))\n", " f.writerow([product['id'],\n", " user['id'], \n", " 'OrderCompleted',\n", " this_timestamp])\n", " interactions += 1\n", " order_completed_count += 1\n", " \n", "print(\"Done\")\n", "print(\"Total interactions: \" + str(interactions))\n", "print(\"Total product viewed: \" + str(product_viewed_count))\n", "print(\"Total product added: \" + str(product_added_count))\n", "print(\"Total cart viewed: \" + str(cart_viewed_count))\n", "print(\"Total checkout started: \" + str(checkout_started_count))\n", "print(\"Total order completed: \" + str(order_completed_count))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Open and explore the Interactions Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_df = pd.read_csv(interactions_filename)\n", "interactions_df" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Chart the counts of each `EVENT_TYPE` generated for the interactions dataset. We're simulating a site where visitors heavily view/browse products and to a lesser degree add products to their cart and checkout." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "categorical_attributes = interactions_df.select_dtypes(include = ['object'])\n", "\n", "plt.figure(figsize=(16,3))\n", "sns.countplot(data = categorical_attributes, x = 'EVENT_TYPE')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Upload Data\n", "Now we will upload the data we prepared (currently saved in our local folder) to Amazon S3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "boto3.Session().resource('s3').Bucket(bucket).Object(interactions_filename).upload_file(interactions_filename)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "boto3.Session().resource('s3').Bucket(bucket).Object(items_filename).upload_file(items_filename)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "boto3.Session().resource('s3').Bucket(bucket).Object(users_filename).upload_file(users_filename)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Configure Amazon Personalize\n", "\n", "Now that we've prepared our three datasets and uploaded them to S3 we'll need to configure the Amazon Personalize service to understand our data so that it can be used to train models for generating recommendations.\n", "\n", "## Create a dataset group\n", "\n", "The highest level of isolation and abstraction with Amazon Personalize is a *dataset group*. Information stored within one of these dataset groups has no impact on any other dataset group or models created from one - they are completely isolated. This allows you to run many experiments and is part of how we keep your models private and fully trained only on your data. \n", "\n", "Before importing the data prepared earlier, there needs to be a dataset group and a dataset added to it that handles the interactions.\n", "\n", "Dataset groups can house the following types of information:\n", "\n", "* User-item-interactions\n", "* Event streams (real-time interactions)\n", "* User metadata\n", "* Item metadata\n", "\n", "Before we create the dataset group and the dataset for our interaction data, let's validate that your environment can communicate successfully with Amazon Personalize." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Configure the SDK to Personalize:\n", "personalize = boto3.client('personalize')\n", "personalize_runtime = boto3.client('personalize-runtime')\n", "print(\"We can communicate with Personalize!\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create and Wait for Dataset Group\n", "\n", "Next we need to create the dataset group that will contain our three datasets." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Dataset Group" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_dataset_group_response = personalize.create_dataset_group(\n", " name = 'cgp-dataset'\n", ")\n", "dataset_group_arn = create_dataset_group_response['datasetGroupArn']\n", "print(json.dumps(create_dataset_group_response, indent=2))\n", "\n", "print(f'DatasetGroupArn = {dataset_group_arn}')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Wait for Dataset Group to Have ACTIVE Status" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Before we can use the dataset group, it must be active. This can take a minute or two. Execute the cell below and wait for it to show the ACTIVE status. It checks the status of the dataset group every 15s, up to a maximum of 3 hours." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = None\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " describe_dataset_group_response = personalize.describe_dataset_group(\n", " datasetGroupArn = dataset_group_arn\n", " )\n", " status = describe_dataset_group_response[\"datasetGroup\"][\"status\"]\n", " print(\"DatasetGroup: {}\".format(status))\n", " \n", " if status == \"ACTIVE\" or status == \"CREATE FAILED\":\n", " break\n", " \n", " time.sleep(15)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Schemas for Datasets\n", "\n", "Amazon Personalize requires a schema for each dataset so it can map the columns in our CSVs to fields for model training. Each schema is declared in JSON using the [Apache Avro](https://avro.apache.org/) format. More detailed information can be found in the [documentation](https://docs.aws.amazon.com/personalize/latest/dg/how-it-works-dataset-schema.html).\n", "\n", "Let's define and create schemas in Personalize for our datasets." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Items Datsaset Schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "items_schema = {\n", " \"type\": \"record\",\n", " \"name\": \"Items\",\n", " \"namespace\": \"com.amazonaws.personalize.schema\",\n", " \"fields\": [\n", " {\n", " \"name\": \"ITEM_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"CATEGORY\",\n", " \"type\": \"string\",\n", " \"categorical\": True\n", " },\n", " {\n", " \"name\": \"TYPE\",\n", " \"type\": \"string\",\n", " \"categorical\": True\n", " },\n", " {\n", " \"name\": \"SIZE\",\n", " \"type\": \"string\",\n", " \"categorical\": True\n", " },\n", " {\n", " \"name\": \"SUGAR\",\n", " \"type\": \"string\",\n", " \"categorical\": True\n", " }\n", " ],\n", " \"version\": \"1.0\"\n", "}\n", "\n", "create_schema_response = personalize.create_schema(\n", " name = \"cpg-schema-items-1\",\n", " schema = json.dumps(items_schema)\n", ")\n", "\n", "items_schema_arn = create_schema_response['schemaArn']\n", "print(json.dumps(create_schema_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Users Dataset Schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "users_schema = {\n", " \"type\": \"record\",\n", " \"name\": \"Users\",\n", " \"namespace\": \"com.amazonaws.personalize.schema\",\n", " \"fields\": [\n", " {\n", " \"name\": \"USER_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"AGE\",\n", " \"type\": \"int\"\n", " },\n", " {\n", " \"name\": \"GENDER\",\n", " \"type\": \"string\",\n", " \"categorical\": True\n", " }\n", " ],\n", " \"version\": \"1.0\"\n", "}\n", "\n", "create_schema_response = personalize.create_schema(\n", " name = \"cpg-schema-users-1\",\n", " schema = json.dumps(users_schema)\n", ")\n", "\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "users_schema_arn = create_schema_response['schemaArn']\n", "print(json.dumps(create_schema_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Interactions Dataset Schema" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_schema = {\n", " \"type\": \"record\",\n", " \"name\": \"Interactions\",\n", " \"namespace\": \"com.amazonaws.personalize.schema\",\n", " \"fields\": [\n", " {\n", " \"name\": \"ITEM_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"USER_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"EVENT_TYPE\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"TIMESTAMP\",\n", " \"type\": \"long\"\n", " },\n", " {\n", " \"name\": \"ITEM_SUGAR_LEVEL\",\n", " \"type\": \"string\",\n", " \"categorical\": True\n", " }\n", " ],\n", " \"version\": \"1.0\"\n", "}\n", "\n", "create_schema_response = personalize.create_schema(\n", " name = \"cpg-schema-interactions-1\",\n", " schema = json.dumps(interactions_schema)\n", ")\n", "\n", "interactions_schema_arn = create_schema_response['schemaArn']\n", "print(json.dumps(create_schema_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Working in different sessions or connection issues. \n", "If you are doing the workshop in an AWS event or finishing it in single session please ignore this section. \n", "In case you lose connection or reset the kernel and want to resume from where you left it, a lot of the variables needs to be instantiated again, and usually are the arns of the resources you created before. Below an example." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "### Only if you are doing this in several sessions or restarted the kernel. \n", "### Make sure you update with the ARNs you created above!!!\n", "#items_schema_arn = 'arn:aws:personalize:us-east-1:444208467160:schema/cpg-schema-items-v2'\n", "#users_schema_arn = 'arn:aws:personalize:us-east-1:444208467160:schema/cpg-schema-users'\n", "#interactions_schema_arn = 'arn:aws:personalize:us-east-1:444208467160:schema/cpg-interactions'\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Items Dataset\n", "\n", "Next we will create the datasets in Personalize for our three dataset types. Let's start with the items dataset." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dataset_type = \"ITEMS\"\n", "create_dataset_response = personalize.create_dataset(\n", " name = \"cpg-dataset-items\",\n", " datasetType = dataset_type,\n", " datasetGroupArn = dataset_group_arn,\n", " schemaArn = items_schema_arn\n", ")\n", "\n", "items_dataset_arn = create_dataset_response['datasetArn']\n", "print(json.dumps(create_dataset_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Users Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dataset_type = \"USERS\"\n", "create_dataset_response = personalize.create_dataset(\n", " name = \"cpg-dataset-users\",\n", " datasetType = dataset_type,\n", " datasetGroupArn = dataset_group_arn,\n", " schemaArn = users_schema_arn\n", ")\n", "\n", "users_dataset_arn = create_dataset_response['datasetArn']\n", "print(json.dumps(create_dataset_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Interactions Dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dataset_type = \"INTERACTIONS\"\n", "create_dataset_response = personalize.create_dataset(\n", " name = \"cpg-dataset-interactions\",\n", " datasetType = dataset_type,\n", " datasetGroupArn = dataset_group_arn,\n", " schemaArn = interactions_schema_arn\n", ")\n", "\n", "interactions_dataset_arn = create_dataset_response['datasetArn']\n", "print(json.dumps(create_dataset_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Import Datasets to Personalize\n", "\n", "Up to this point we have generated CSVs containing data for our users, items, and interactions and staged them in an S3 bucket. We also created schemas in Personalize that define the columns in our CSVs. Then we created a datset group and three datasets in Personalize that will receive our data. In the following steps we will create import jobs with Personalize that will import the datasets from our S3 bucket into the service.\n", "\n", "### Setup Permissions\n", "\n", "By default, the Personalize service does not have permission to acccess the data we uploaded into the S3 bucket in our account. In order to grant access to the Personalize service to read our CSVs, we need to set a Bucket Policy and create an IAM role that the Amazon Personalize service will assume." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Attach policy to S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3 = boto3.client(\"s3\")\n", "\n", "policy = {\n", " \"Version\": \"2012-10-17\",\n", " \"Id\": \"PersonalizeS3BucketAccessPolicy\",\n", " \"Statement\": [\n", " {\n", " \"Sid\": \"PersonalizeS3BucketAccessPolicy\",\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"personalize.amazonaws.com\"\n", " },\n", " \"Action\": [\n", " \"s3:GetObject\",\n", " \"s3:ListBucket\"\n", " ],\n", " \"Resource\": [\n", " \"arn:aws:s3:::{}\".format(bucket),\n", " \"arn:aws:s3:::{}/*\".format(bucket)\n", " ]\n", " }\n", " ]\n", "}\n", "\n", "s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy));" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create S3 Read Only Access Role" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iam = boto3.client(\"iam\")\n", "\n", "role_name = 'CPG'+\"-PersonalizeS3\"\n", "assume_role_policy_document = {\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"personalize.amazonaws.com\"\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }\n", " ]\n", "}\n", "\n", "create_role_response = iam.create_role(\n", " RoleName = role_name,\n", " AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)\n", ");\n", "\n", "iam.attach_role_policy(\n", " RoleName = role_name,\n", " PolicyArn = \"arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess\"\n", ");\n", "\n", "role_arn = create_role_response[\"Role\"][\"Arn\"]\n", "print('IAM Role: {}'.format(role_arn))\n", "# Pause to allow role to fully persist\n", "time.sleep(10)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Create Import Jobs\n", "\n", "With the permissions in place to allow Personalize to access our CSV files, let's create three import jobs to import each file from Amazon S3 into its respective dataset. Each import job can take several minutes to complete so we'll create all three and then wait for them all to complete." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Items Dataset Import Job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "## Only use the role if you are reusing without cleaning previous runs.\n", "# role_arn = ''\n", "\n", "items_create_dataset_import_job_response = personalize.create_dataset_import_job(\n", " jobName = \"cpg-dataset-items-import-job\",\n", " datasetArn = items_dataset_arn,\n", " dataSource = {\n", " \"dataLocation\": \"s3://{}/{}\".format(bucket, items_filename)\n", " },\n", " roleArn = role_arn\n", ")\n", "\n", "items_dataset_import_job_arn = items_create_dataset_import_job_response['datasetImportJobArn']\n", "print(json.dumps(items_create_dataset_import_job_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Users Dataset Import Job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "users_create_dataset_import_job_response = personalize.create_dataset_import_job(\n", " jobName = \"cpg-dataset-users-import-job\",\n", " datasetArn = users_dataset_arn,\n", " dataSource = {\n", " \"dataLocation\": \"s3://{}/{}\".format(bucket, users_filename)\n", " },\n", " roleArn = role_arn\n", ")\n", "\n", "users_dataset_import_job_arn = users_create_dataset_import_job_response['datasetImportJobArn']\n", "print(json.dumps(users_create_dataset_import_job_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Interactions Dataset Import Job" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_create_dataset_import_job_response = personalize.create_dataset_import_job(\n", " jobName = \"cpg-dataset-interactions-import-job\",\n", " datasetArn = interactions_dataset_arn,\n", " dataSource = {\n", " \"dataLocation\": \"s3://{}/{}\".format(bucket, interactions_filename)\n", " },\n", " roleArn = role_arn\n", ")\n", "\n", "interactions_dataset_import_job_arn = interactions_create_dataset_import_job_response['datasetImportJobArn']\n", "print(json.dumps(interactions_create_dataset_import_job_response, indent=2))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Wait for Import Jobs to Complete\n", "\n", "It will take 10-15 minutes for the import jobs to complete, while you're waiting you can learn more about Datasets and Schemas in the [documentation](https://docs.aws.amazon.com/personalize/latest/dg/how-it-works-dataset-schema.html).\n", "\n", "We will wait for all three jobs to finish." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Wait for Items Import Job to Complete\n", "\n", "Before we can use the datasets, the import jobs must be active. Execute the cell below and wait for it to show the ACTIVE status for all import jobs. It checks the status of the import jobs every minute, up to a maximum of 3 hours.\n", "\n", "Importing the data can take some time, depending on the size of the dataset. " ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%time\n", "\n", "import_job_arns = [ items_dataset_import_job_arn, users_dataset_import_job_arn, interactions_dataset_import_job_arn ]\n", "\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " for job_arn in reversed(import_job_arns):\n", " import_job_response = personalize.describe_dataset_import_job(\n", " datasetImportJobArn = job_arn\n", " )\n", " status = import_job_response[\"datasetImportJob\"]['status']\n", "\n", " if status == \"ACTIVE\":\n", " print(f'Import job {job_arn} successfully completed')\n", " import_job_arns.remove(job_arn)\n", " elif status == \"CREATE FAILED\":\n", " print(f'Import job {job_arn} failed')\n", " if import_job_response.get('failureReason'):\n", " print(' Reason: ' + import_job_response['failureReason'])\n", " import_job_arns.remove(job_arn)\n", "\n", " if len(import_job_arns) > 0:\n", " print('At least one dataset import job still in progress')\n", " time.sleep(60)\n", " else:\n", " print(\"All import jobs have ended\")\n", " break" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Congratulations you finished the data layer notebook\n", "\n", "Now, lets store all the values needed to continue on the next notebook." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store dataset_group_arn\n", "%store items_dataset_arn\n", "%store users_dataset_arn\n", "%store interactions_dataset_arn\n", "%store role_arn\n", "%store users_dataset_import_job_arn\n", "%store interactions_dataset_import_job_arn\n", "%store items_dataset_import_job_arn\n", "%store products_dataset_df\n", "%store products_df\n", "%store region" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "conda_amazonei_mxnet_p36", "language": "python", "name": "conda_amazonei_mxnet_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.13" } }, "nbformat": 4, "nbformat_minor": 4 }