{ "cells": [ { "cell_type": "markdown", "id": "f760cf42", "metadata": {}, "source": [ "# Amazon Personalize User Segmentation Workshop - Lab 7\n", "\n", "In this workshop we will prepare and execute a segmentation job that creates multiple audiences/segments of users with an affinity for different product categories in the Retail Demo Store. We could then take these user segments and use them to target users with promotions for items in each category." ] }, { "cell_type": "markdown", "id": "32e7894f", "metadata": {}, "source": [ "## Setup\n", "\n", "The workshop will be using the python programming language and the AWS SDK for python. Even if you are not fluent in python, the code cells should be reasonably intuitive. In practice, you can use any programming language supported by the AWS SDK to complete the same steps from this workshop in your application environment.\n", "\n", "### Update dependencies\n", "\n", "To get started, we need to perform a bit of setup. First, we need to ensure that a current version of botocore is locally installed. The botocore library is used by boto3, the AWS SDK library for python. We need a current version to be able to access some of the newer Amazon Personalize features.\n", "\n", "The following cell will update pip and install the latest botocore library." ] }, { "cell_type": "code", "execution_count": null, "id": "2ec5d848", "metadata": {}, "outputs": [], "source": [ "import sys\n", "!{sys.executable} -m pip install --upgrade pip\n", "!{sys.executable} -m pip install --upgrade --no-deps --force-reinstall botocore" ] }, { "cell_type": "markdown", "id": "755f56a2", "metadata": {}, "source": [ "### Import dependencies and prepare clients\n", "\n", "First we will import the libraries and create the clients needed for this workshop." ] }, { "cell_type": "code", "execution_count": null, "id": "7f93a517", "metadata": {}, "outputs": [], "source": [ "# Import dependencies\n", "import boto3\n", "import json\n", "import pandas as pd\n", "import numpy\n", "import time\n", "import requests\n", "import botocore\n", "from datetime import datetime\n", "from IPython.display import display, HTML\n", "from packaging import version\n", "\n", "# Create clients\n", "personalize = boto3.client('personalize')\n", "servicediscovery = boto3.client('servicediscovery')\n", "ssm = boto3.client('ssm')" ] }, { "cell_type": "markdown", "id": "22c0ea4a", "metadata": {}, "source": [ "The following cell will load the saved variables from the earlier foundational Personalize workshops." ] }, { "cell_type": "code", "execution_count": null, "id": "0086e08f", "metadata": {}, "outputs": [], "source": [ "%store -r" ] }, { "cell_type": "markdown", "id": "9a020691", "metadata": {}, "source": [ "## Lookup working S3 bucket name\n", "\n", "We will stage the segmentation job input file on S3 and have the segmentation job output written to S3. We'll use the same S3 stack bucket used for other Personalize workshops. The bucket name is stored in SSM so let's lookup the value." ] }, { "cell_type": "code", "execution_count": null, "id": "1f8aedfb", "metadata": {}, "outputs": [], "source": [ "bucketresponse = ssm.get_parameter(\n", " Name='retaildemostore-stack-bucket'\n", ")\n", "\n", "# We will use this bucket to store our training data:\n", "bucket = bucketresponse['Parameter']['Value'] # Do Not Change\n", "\n", "print('Bucket: {}'.format(bucket))" ] }, { "cell_type": "markdown", "id": "7d76582b", "metadata": {}, "source": [ "## Retrieve IP addresses of Products and Users microservices\n", "\n", "We will select a random product from the Retail Demo Store's catalog to use to find users with an affinity for the product's attributes. Let's get the local IP address of the Products microservice so we can call its API to retrieve products." ] }, { "cell_type": "code", "execution_count": null, "id": "7ff0c7e9", "metadata": {}, "outputs": [], "source": [ "response = servicediscovery.discover_instances(\n", " NamespaceName='retaildemostore.local',\n", " ServiceName='products',\n", " MaxResults=1,\n", " HealthStatus='HEALTHY'\n", ")\n", "\n", "assert len(response['Instances']) > 0, 'Products service instance not found; check ECS to ensure it launched cleanly'\n", "\n", "products_service_instance = response['Instances'][0]['Attributes']['AWS_INSTANCE_IPV4']\n", "print('Products Service Instance IP: {}'.format(products_service_instance))" ] }, { "cell_type": "markdown", "id": "dbe764a3", "metadata": {}, "source": [ "We will also lookup info on each user included in the user segmentation output job to see who was recommended. We'll use the Users microservice to get details on each user ID." ] }, { "cell_type": "code", "execution_count": null, "id": "6a6fc4a9", "metadata": {}, "outputs": [], "source": [ "response = servicediscovery.discover_instances(\n", " NamespaceName='retaildemostore.local',\n", " ServiceName='users',\n", " MaxResults=1,\n", " HealthStatus='HEALTHY'\n", ")\n", "\n", "assert len(response['Instances']) > 0, 'Users service instance not found; check ECS to ensure it launched cleanly'\n", "\n", "users_service_instance = response['Instances'][0]['Attributes']['AWS_INSTANCE_IPV4']\n", "print('Users Service Instance IP: {}'.format(users_service_instance))" ] }, { "cell_type": "markdown", "id": "469d54c1", "metadata": {}, "source": [ "## Load products into DataFrame\n", "\n", "Let's load all of the products from the Products microservice into a Pandas dataframe." ] }, { "cell_type": "code", "execution_count": null, "id": "694bc756", "metadata": {}, "outputs": [], "source": [ "response = requests.get('http://{}/products/all'.format(products_service_instance))\n", "products = response.json()\n", "products_df = pd.DataFrame(products)\n", "pd.set_option('display.max_rows', 5)\n", "\n", "products_df" ] }, { "cell_type": "markdown", "id": "3b4d95a4", "metadata": {}, "source": [ "## Prepare input file for batch segment job\n", "\n", "Next we will prepare a user segmentation input file by randomly selecting a product from the catalog.\n", "\n", "First, let's consider the format of the job input file. Below is a sample of the input file for an item affinity job that builds 3 user segments looking for users for a Video On Demand application that are interested in both comedies and action movies, users just interested in comedies, and users interested in both horror and action movies:\n", "\n", "```javascript\n", "{\"itemAttributes\": \"ITEMS.genres = \\\"Comedy\\\" AND ITEMS.genres = \\\"Action\\\"\"}\n", "{\"itemAttributes\": \"ITEMS.genres = \\\"Comedy\\\"\"}\n", "{\"itemAttributes\": \"ITEMS.genres = \\\"Horror\\\" AND ITEMS.genres = \\\"Action\\\"\"}\n", "```\n", "\n", "For our user segmentation job, we will select a few categories and use each category name as the item attribute we want to use to group users." ] }, { "cell_type": "code", "execution_count": null, "id": "85e706ef", "metadata": {}, "outputs": [], "source": [ "categories = products_df['category'].unique()\n", "print(categories)" ] }, { "cell_type": "markdown", "id": "53ca40f8", "metadata": {}, "source": [ "Let's randomly select 3 categories to use in the job input file." ] }, { "cell_type": "code", "execution_count": null, "id": "fc9f767a", "metadata": {}, "outputs": [], "source": [ "segment_categories = numpy.random.choice(categories, 3, False)\n", "print(segment_categories)" ] }, { "cell_type": "code", "execution_count": null, "id": "83de00fa", "metadata": {}, "outputs": [], "source": [ "# Create and write job input file to disk\n", "json_input_filename = \"item_affinity_json_input.json\"\n", "with open(json_input_filename, 'w') as json_input:\n", " for category in segment_categories:\n", " # Write line that specifies the query for users with an affinity for the CATEGORY_L1 field\n", " json_input.write(f'{{\"itemAttributes\": \"ITEMS.CATEGORY_L1 = \\\\\"{category}\\\\\"\"}}\\n')" ] }, { "cell_type": "markdown", "id": "85ff0f90", "metadata": {}, "source": [ "Display the job input file contents. One very important characteristic of the job input file is that the `itemAttributes` query expression for each segment must be fully defined in a single line." ] }, { "cell_type": "code", "execution_count": null, "id": "2efc4c66", "metadata": {}, "outputs": [], "source": [ "!cat $json_input_filename" ] }, { "cell_type": "markdown", "id": "f6b36c4b", "metadata": {}, "source": [ "## Upload job input file to S3 bucket\n", "\n", "Before we can create a segmentation job, we have to upload the job input file to our S3 bucket." ] }, { "cell_type": "code", "execution_count": null, "id": "7c378c9f", "metadata": {}, "outputs": [], "source": [ "# Upload file to S3\n", "boto3.Session().resource('s3').Bucket(bucket).Object(json_input_filename).upload_file(json_input_filename)\n", "s3_input_path = \"s3://\" + bucket + \"/\" + json_input_filename\n", "print(s3_input_path)" ] }, { "cell_type": "markdown", "id": "fb6886d5", "metadata": {}, "source": [ "## Define job output location\n", "\n", "We also need to define an output location in our S3 bucket where the segmentation job writes its output." ] }, { "cell_type": "code", "execution_count": null, "id": "f1a0aa17", "metadata": {}, "outputs": [], "source": [ "# Define the output path\n", "s3_output_path = \"s3://\" + bucket + \"/user-segmentation/attrib-affinity/\"\n", "print(s3_output_path)" ] }, { "cell_type": "markdown", "id": "6836c2c8", "metadata": {}, "source": [ "## Create batch segment job\n", "\n", "Finally, we're ready to create a batch segment job. There are several required parameters including the solution version ARN for the item affinity model we created in a prior workshop, the IAM role that Personalize needs to access the job input file and write the output file, and the job input and output locations. We're also optionally specifying that we only want the top 25 users in our user segment." ] }, { "cell_type": "code", "execution_count": null, "id": "7329f266", "metadata": {}, "outputs": [], "source": [ "response = personalize.create_batch_segment_job (\n", " solutionVersionArn = item_attribute_affinity_solution_version_arn,\n", " jobName = \"retaildemostore-item-attrib-affinity_\" + str(round(time.time()*1000)),\n", " roleArn = role_arn,\n", " jobInput = {\"s3DataSource\": {\"path\": s3_input_path}},\n", " jobOutput = {\"s3DataDestination\":{\"path\": s3_output_path}},\n", " numResults = 25\n", ")\n", "job_arn = response['batchSegmentJobArn']\n", "print(json.dumps(response, indent=2, default=str))" ] }, { "cell_type": "markdown", "id": "d86f291c", "metadata": {}, "source": [ "## Wait for batch segment job to complete\n", "\n", "The user segmentation job can take several minutes to complete. Even though our input file only specifies a few item affinity query expressions, there is a certain amount of fixed overhead required for Personalize to spin up the compute resources needed to execute the job. This overhead is amortized for larger input files that generate many user segments." ] }, { "cell_type": "code", "execution_count": null, "id": "f44c0cb2", "metadata": {}, "outputs": [], "source": [ "%%time \n", "\n", "current_time = datetime.now()\n", "print(\"Import Started on: \", current_time.strftime(\"%I:%M:%S %p\"))\n", "\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " response = personalize.describe_batch_segment_job(\n", " batchSegmentJobArn = job_arn\n", " )\n", " status = response[\"batchSegmentJob\"]['status']\n", " print(\"DatasetSegmentJob: {}\".format(status))\n", " \n", " if status == \"ACTIVE\" or status == \"CREATE FAILED\":\n", " break\n", " \n", " time.sleep(60)\n", " \n", "current_time = datetime.now()\n", "print(\"Import Completed on: \", current_time.strftime(\"%I:%M:%S %p\"))" ] }, { "cell_type": "markdown", "id": "30c8cbc6", "metadata": {}, "source": [ "## Download and inspect job output file\n", "\n", "Let's download the output file from S3 and display its contents." ] }, { "cell_type": "code", "execution_count": null, "id": "a21ae405", "metadata": { "scrolled": false }, "outputs": [], "source": [ "s3 = boto3.client('s3')\n", "out_file = json_input_filename + \".out\"\n", "export_name = 'user-segmentation/attrib-affinity/' + out_file\n", "s3.download_file(bucket, export_name, out_file)\n", "\n", "!cat $out_file" ] }, { "cell_type": "markdown", "id": "680a8213", "metadata": {}, "source": [ "Notice that the input item attribute query expressions are echoed in the output file but we also have `output` and `error` elements for each user segment. The `output` element has a `usersList` array that contains the user IDs for the segment. If there were any errors enountered while generating a segment, details will be included in the `error` element for the segment." ] }, { "cell_type": "markdown", "id": "de142729", "metadata": {}, "source": [ "## Inspect user segments\n", "\n", "Let's take a look at the users selected for each segment to get a sense of their interests, recent behavior, and affinity for the category used for each segment.\n", "\n", "To be able to determine the relevance of the users for each segment, we will display the last 5 products viewed for the users in each segment. We'll get the 5 most recent products viewed from the interactions dataset used to train the item affinity solution version.\n", "\n", "Let's start by loading the interactions dataset into a dataframe and display the first few rows." ] }, { "cell_type": "code", "execution_count": null, "id": "7efdfe9b", "metadata": {}, "outputs": [], "source": [ "interactions_df = pd.read_csv('interactions.csv')\n", "interactions_df['USER_ID'] = interactions_df.USER_ID.astype(str)\n", "interactions_df['TIMESTAMP'] = interactions_df.TIMESTAMP.astype(int)\n", "interactions_df.head(5)" ] }, { "cell_type": "markdown", "id": "d8fd0c5f", "metadata": {}, "source": [ "We query this dataframe for the user ID included in each segment to fetch each user's interactions.\n", "\n", "Next we'll create a utility function that we'll call for each user in a segment that will lookup the user name from the Users microservice as well as the most recent product view events. This information is concatenated to a working dataframe that collects info on all users in a user segment." ] }, { "cell_type": "code", "execution_count": null, "id": "6fa03118", "metadata": {}, "outputs": [], "source": [ "def append_user_info_and_recent_interactions(df_work, user_id: str, count: int = 5):\n", " # Fetch user info from the Users microservice.\n", " response = requests.get('http://{}/users/id/{}'.format(users_service_instance, user_id))\n", " user = response.json()\n", "\n", " d = {\n", " 'userId': [ user_id ], \n", " 'name': [ user['first_name'] + ' ' + user['last_name'] ]\n", " }\n", "\n", " # Lookup product view events for the user, sort descending by timestamp, and trim list.\n", " user_interactions_df = interactions_df.loc[\n", " (interactions_df['USER_ID'] == user_id) & (interactions_df['EVENT_TYPE'] == 'View')\n", " ]\n", " user_interactions_df = user_interactions_df.sort_values(by=['TIMESTAMP'], ascending=False)\n", " user_interactions_df = user_interactions_df.head(count)\n", " \n", " # Lookup product name and category for recently viewed products.\n", " prod_count = 1\n", " for product_id in user_interactions_df['ITEM_ID']:\n", " product = products_df.loc[products_df['id'] == product_id].iloc[0]\n", " d[f'productView{prod_count}'] = [ f'{product[\"name\"]} ({product[\"category\"]})' ]\n", " prod_count += 1\n", " \n", " # Concatenate user details and recent views to working dataframe.\n", " return pd.concat([df_work, pd.DataFrame(data=d)], axis=0, ignore_index=True)" ] }, { "cell_type": "markdown", "id": "285ab603", "metadata": {}, "source": [ "Finally, we're ready to read the contents of the user segmentation job output file and write an intuitive overview of each segment." ] }, { "cell_type": "code", "execution_count": null, "id": "5d5c98a8", "metadata": {}, "outputs": [], "source": [ "pd.options.display.max_rows = 25\n", "\n", "with open(out_file) as segments_file:\n", " # Read all lines from the segmentation output file.\n", " segment_lines = segments_file.readlines()\n", " \n", " for idx, segment_line in enumerate(segment_lines):\n", " segment = json.loads(segment_line)\n", " attrib = segment['input']['itemAttributes']\n", " users_list = segment['output']['usersList']\n", "\n", " # Get info on each user in the segment.\n", " segment_info_df = pd.DataFrame()\n", " for user_id in users_list:\n", " segment_info_df = append_user_info_and_recent_interactions(segment_info_df, str(user_id))\n", "\n", " # Display details on all users in the segment\n", " display(HTML(f'