{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "## Steps\n", "\n", "This notebook shows how to use Amazon Personalize's new user personalization recipe (aws-user-personalization). This recipe balances recommendations between new and old items, allowing you to adjust the balance in favor of more new or more old items\n", "\n", "At a high level, using the new USER Personalization recipe involves the below steps: \n", "\n", "1. Setup Personalize client\n", "2. Create DatasetGroup, Define a Schema, Import Datasets and Ingest real-time Interactions\n", "4. Create a Campaign with a new config `campaignConfig`\n", "5. Create a Event Tracker to ingest the events sent by PutEvents.\n", "6. Call GetRecommendations, a new field `RecommendationId` is returned in the response.\n", "7. Call putEvents with `RecommendationId` or a custom list of Impression Items.\n", "8. Wait for the campaign to be updated.\n", "9. Update campaign to stop the auto update.\n", "10. Cleanup\n", "\n", "\n", "> **NOTE:**: **Execution of this notebook will take a couple of hours.**" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Access Key/Secret Key setup for AWS API access.\n", "\n", "Make sure the accessKey, secretKey you use have the approriate permissions. Also choose the region you want to run this demo" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "accessKeyId = \"\"\n", "secretAccessKey = \"\"\n", "region_name = \"\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import boto3\n", "from botocore.exceptions import ClientError\n", "import time\n", "import numpy as np\n", "import pandas as pd\n", "import json\n", "from datetime import datetime" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "suffix = str(np.random.uniform())[4:9]\n", "prefix = 'user-personalization-'\n", "print('prefix+suffix:{}{}'.format(prefix, suffix))\n", "s3_bucket_name = (prefix + suffix).lower()\n", "interaction_schema_name = prefix + 'interaction-' + suffix\n", "item_metadata_schema_name = prefix + 'items-' + suffix\n", "dataset_group_name = prefix + suffix\n", "interaction_dataset_name = prefix + 'interactions-' + suffix\n", "item_metadata_dataset_name = prefix + 'items-' + suffix\n", "event_tracker_name = prefix + suffix\n", "solution_name = prefix + suffix\n", "event_tracker_name = prefix + suffix\n", "campaign_name = prefix + suffix" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 1. Client setup\n", "Let's first setup the client for personalize and s3." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Public s3 bucket owned by Personalize service which used to store the example dataset.\n", "\n", "personalize_s3_bucket = \"personalize-cli-json-models\"\n", "s3_client = boto3.Session(aws_access_key_id=accessKeyId,\n", " aws_secret_access_key=secretAccessKey, region_name=region_name).client('s3')\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Initialize personalize clients" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "personalize = boto3.Session(aws_access_key_id=accessKeyId,\n", " aws_secret_access_key=secretAccessKey, region_name=region_name).client('personalize')\n", "personalize_runtime = boto3.Session(aws_access_key_id=accessKeyId,\n", " aws_secret_access_key=secretAccessKey, region_name=region_name).client('personalize-runtime')\n", "personalize_events = boto3.Session(aws_access_key_id=accessKeyId,\n", " aws_secret_access_key=secretAccessKey, region_name=region_name).client('personalize-events')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Sample Datasets\n", "\n", "For convenience and for the purposes of this demo, we'll use the sample datasets provided by personalize.\n", "We provided two dataset, one is item metadata, another one is interaction dataset. Let's first download it to local" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interaction_dataset_key = \"sample-dataset/interactions-sample.csv\"\n", "items_dataset_key = \"sample-dataset/items-with-creation-timestamp-sample.csv\"\n", "interactions_file = os.getcwd() + \"/interaction_raw.csv\"\n", "items_metadata_file = os.getcwd() + \"/items_raw.csv\"\n", "s3_client.download_file(personalize_s3_bucket, interaction_dataset_key, interactions_file)\n", "s3_client.download_file(personalize_s3_bucket, items_dataset_key, items_metadata_file)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_df = pd.read_csv(interactions_file)\n", "items_df = pd.read_csv(items_metadata_file)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Let's have a glance at the interaction dataframe" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_df.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Interactions dataset\n", "**ITEM_ID**: Item corresponding to the EVENT_TYPE.\n", "\n", "**EVENT_TYPE**: Event type. \n", "\n", "**TIMESTAMP**: Timestamp of the Interaction in milliseconds. (Note that it is in milliseconds) \n", "\n", "**USER_ID**: User Id corresponding to this impression. \n", "\n", "**IMPRESSION:** Now you could optionally pass impression data along with the event data in the interaction dataset. This is passed in a new field `IMPRESSION`as you can see above which takes a piped concatination of the items that the user interacted with (for example the items that were shown to the user), Impressions also include the clicked Items. \n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "items_df.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Items dataset\n", "\n", "Items dataset contains Item_ids and associated metadata.\n", "\n", "**ITEM_ID**: Item Id \n", "\n", "**genres** Metadata of the Item, if multiple categorical classification for the same item then use a pipe \" | \" to concatenate.\n", "\n", "**creation_timestamp** Timestamp when the item was added\n", "\n", "More details about datasets can be found in the documentation\n", "https://docs.aws.amazon.com/personalize/latest/dg/how-it-works-dataset-schema.html" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Update the timestamp\n", "\n", "Here we are going to update the stamp of our dataset to 8 days from today in order to show the impact of real-time interactions on our recommendations" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "current_time = int(time.time())\n", "one_hour_ago = current_time - 8 * 24 * 60 * 60\n", "# Get the time gap between the latest timestamp in the interaction and the current time \n", "interactions_df = interactions_df.astype({\"TIMESTAMP\": 'int64'})\n", "latest_time_in_csv = interactions_df[\"TIMESTAMP\"].max()\n", "delta = one_hour_ago - latest_time_in_csv" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# shift the latest timestamp in the interactions_df to be the last hour timestamp\n", "interactions_df.TIMESTAMP = interactions_df.TIMESTAMP + delta\n", "interactions_df.to_csv(os.getcwd() + \"/interaction.csv\", index = False)\n", "\n", "# shift the latest timestamp in the items_df to be the last hour timestamp\n", "items_df = items_df.astype({\"creation_timestamp\": 'int64'})\n", "items_df.creation_timestamp = items_df.creation_timestamp + delta\n", "items_df.to_csv(os.getcwd() + \"/items.csv\", index = False)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### After update, let's check the interaction and item dataset one more time" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_df.head(2)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "items_df.head(2)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "\n", "### 2. Ingest data to Amazon Personalize\n", "Now lets create a datasetGroup, create Schema, Upload the datasets, create a datasetImport Job. This has not changed." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### a. Create DatasetGroup\n", "\n", "This is similar to the existing recipe, the full documentation can be found [here](https://docs.aws.amazon.com/personalize/latest/dg/API_DatasetGroup.html)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_dataset_group_response = personalize.create_dataset_group(\n", " name = dataset_group_name\n", ")\n", "dataset_group_arn = create_dataset_group_response['datasetGroupArn']" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "\n", "print('dataset_group_arn : {}'.format(dataset_group_arn))" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = None\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " describe_dataset_group_response = personalize.describe_dataset_group(\n", " datasetGroupArn = dataset_group_arn\n", " )\n", " status = describe_dataset_group_response[\"datasetGroup\"][\"status\"]\n", " print(\"DatasetGroup: {}\".format(status))\n", " \n", " if status == \"ACTIVE\" or status == \"CREATE FAILED\":\n", " break\n", " \n", " time.sleep(20)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### b. Create Dataset Schemas\n", "\n", "> **_NOTE:_**: `Impression` field has a type String and uses a piped concatination for multiple values." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interaction_schema = {\n", " \"type\": \"record\",\n", " \"name\": \"Interactions\",\n", " \"namespace\": \"com.amazonaws.personalize.schema\",\n", " \"fields\": [\n", " { \n", " \"name\": \"EVENT_TYPE\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"IMPRESSION\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"ITEM_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"TIMESTAMP\",\n", " \"type\": \"long\"\n", " },\n", " {\n", " \"name\": \"USER_ID\",\n", " \"type\": \"string\"\n", " },\n", " ],\n", " \"version\": \"1.0\"\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interaction_schema_response = personalize.create_schema(\n", " name = interaction_schema_name,\n", " schema = json.dumps(interaction_schema)\n", ")\n", "# print(json.dumps(create_schema_response, indent=2))\n", "interaction_schema_arn = interaction_schema_response['schemaArn']\n", "print('interaction_schema_arn:\\n', interaction_schema_arn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "item_metadata_schema = {\n", " \"type\": \"record\",\n", " \"name\": \"Items\",\n", " \"namespace\": \"com.amazonaws.personalize.schema\",\n", " \"fields\": [\n", " {\n", " \"name\": \"ITEM_ID\",\n", " \"type\": \"string\"\n", " },\n", " {\n", " \"name\": \"GENRES\",\n", " \"type\": \"string\",\n", " \"categorical\": True\n", " },\n", " {\n", " \"name\": \"CREATION_TIMESTAMP\",\n", " \"type\": \"long\"\n", " }\n", " ],\n", " \"version\": \"1.0\"\n", "}\n", "\n", "item_metadata_schema_response = personalize.create_schema(\n", " name = item_metadata_schema_name,\n", " schema = json.dumps(item_metadata_schema)\n", ")\n", "\n", "# print(json.dumps(create_schema_response, indent=2))\n", "item_metadata_schema_arn = item_metadata_schema_response['schemaArn']\n", "print('item_metadata_schema_arn:\\n', item_metadata_schema_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### c. create Datasets" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_dataset_response = personalize.create_dataset(\n", " datasetType = 'INTERACTIONS',\n", " datasetGroupArn = dataset_group_arn,\n", " schemaArn = interaction_schema_arn,\n", " name = interaction_dataset_name\n", ")\n", "interaction_dataset_arn = interactions_dataset_response['datasetArn']\n", "#print(json.dumps(create_dataset_response, indent=2))\n", "print('interaction_dataset_arn:\\n', interaction_dataset_arn)\n", "\n", "items_dataset_response = personalize.create_dataset(\n", " datasetType = 'ITEMS',\n", " datasetGroupArn = dataset_group_arn,\n", " schemaArn = item_metadata_schema_arn,\n", " name = item_metadata_dataset_name\n", ")\n", "item_metadata_dataset_arn = items_dataset_response['datasetArn']\n", "#print(json.dumps(create_dataset_response, indent=2))\n", "print('item_metadata_dataset_arn:\\n', item_metadata_dataset_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### d. Upload datasets to the S3 bucket, setup approriate S3 Bucket policy, IAM Role, etc.,\n", "\n", "We need to upload these datasets or you could provide the bucket name where you already have the datasets" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "#!aws s3 mb s3://{s3_bucket_name}\n", "s3_bucket_name" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_client.create_bucket(Bucket=s3_bucket_name,\n", " CreateBucketConfiguration={\n", " 'LocationConstraint': region_name})" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "interactions_file = os.getcwd() + \"/interaction.csv\"\n", "items_metadata_file = os.getcwd() + \"/items.csv\"" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "s3_client.upload_file(Filename=interactions_file, Bucket=s3_bucket_name,\n", " Key=\"interaction.csv\")\n", "s3_client.upload_file(Filename=items_metadata_file, Bucket=s3_bucket_name,\n", " Key=\"items.csv\")" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### e. Attach policy to your S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "policy = {\n", " \"Version\": \"2012-10-17\",\n", " \"Id\": \"PersonalizeS3BucketAccessPolicy\",\n", " \"Statement\": [\n", " {\n", " \"Sid\": \"PersonalizeS3BucketAccessPolicy\",\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"personalize.amazonaws.com\"\n", " },\n", " \"Action\": [\n", " \"s3:GetObject\",\n", " \"s3:ListBucket\"\n", " ],\n", " \"Resource\": [\n", " \"arn:aws:s3:::{}\".format(s3_bucket_name),\n", " \"arn:aws:s3:::{}/*\".format(s3_bucket_name)\n", " ]\n", " }\n", " ]\n", "}\n", "\n", "s3_client.put_bucket_policy(Bucket=s3_bucket_name, Policy=json.dumps(policy));" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### f. Setup Approriate IAM Role so Personalize can access the datasets" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "iam = boto3.client(service_name='iam', \n", " aws_access_key_id = accessKeyId, \n", " aws_secret_access_key = secretAccessKey) \n", "\n", "\n", "role_name = \"PersonalizeS3Role-\"+suffix\n", "assume_role_policy_document = {\n", " \"Version\": \"2012-10-17\",\n", " \"Statement\": [\n", " {\n", " \"Effect\": \"Allow\",\n", " \"Principal\": {\n", " \"Service\": \"personalize.amazonaws.com\"\n", " },\n", " \"Action\": \"sts:AssumeRole\"\n", " }\n", " ]\n", "}\n", "try:\n", " create_role_response = iam.create_role(\n", " RoleName = role_name,\n", " AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)\n", " );\n", "\n", " iam.attach_role_policy(\n", " RoleName = role_name,\n", " PolicyArn = \"arn:aws:iam::aws:policy/AmazonS3ReadOnlyAccess\"\n", " );\n", "\n", " role_arn = create_role_response[\"Role\"][\"Arn\"]\n", "except ClientError as e:\n", " if e.response['Error']['Code'] == 'EntityAlreadyExists':\n", " role_arn = iam.get_role(RoleName=role_name)['Role']['Arn']\n", " else:\n", " raise" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "print('role_arn:', role_arn)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### g. Create DatasetImportJobs to upload data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "time.sleep(20) # wait for RoleARN completion\n", "interactions_dij_response = personalize.create_dataset_import_job(\n", " jobName = prefix + 'interactions-dij-' + suffix,\n", " datasetArn = interaction_dataset_arn,\n", " dataSource = {\n", " \"dataLocation\": \"s3://{}/{}\".format(s3_bucket_name, 'interaction.csv')\n", " },\n", " roleArn = role_arn\n", ")\n", "\n", "interactions_dij_arn = interactions_dij_response['datasetImportJobArn']\n", "print('interactions_dij_arn: ', interactions_dij_arn)\n", "#print(json.dumps(interactions_dij_arn, indent=2))\n", "\n", "items_dij_response = personalize.create_dataset_import_job(\n", " jobName = prefix + 'items-dij-' + suffix,\n", " datasetArn = item_metadata_dataset_arn,\n", " dataSource = {\n", " \"dataLocation\": \"s3://{}/{}\".format(s3_bucket_name, 'items.csv')\n", " },\n", " roleArn = role_arn\n", ")\n", "\n", "items_dij_arn = items_dij_response['datasetImportJobArn']\n", "print('items_dij_arn:', items_dij_arn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "dataset_job_arns = [interactions_dij_arn, items_dij_arn]\n", "\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time and len(dataset_job_arns) != 0:\n", " time.sleep(60) \n", " for dij_arn in dataset_job_arns:\n", " describe_dataset_import_job_response = personalize.describe_dataset_import_job(\n", " datasetImportJobArn = dij_arn\n", " )\n", " dataset_import_job = describe_dataset_import_job_response[\"datasetImportJob\"]\n", " status = None\n", " if \"latestDatasetImportJobRun\" not in dataset_import_job:\n", " status = dataset_import_job[\"status\"]\n", " print(\"{} : {}\".format(dij_arn, status))\n", " else:\n", " status = dataset_import_job[\"latestDatasetImportJobRun\"][\"status\"]\n", " print(\"DIJ_ARN: {}, LatestDatasetImportJobRun: {}\".format(dij_arn, status))\n", " \n", " if status == \"ACTIVE\" or status == \"CREATE FAILED\":\n", " dataset_job_arns.remove(dij_arn)\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 3. Create Solution, SolutionVersion\n", "\n", "We will create a solution with 'aws-user-personalization'. This recipe balances recommendations for new and old items delivered to users" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "recipe_arn = \"arn:aws:personalize:::recipe/aws-user-personalization\"\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "create_solution_response = None\n", "while time.time() < max_time:\n", "\n", " try:\n", " create_solution_response = personalize.create_solution(name=solution_name, \n", " recipeArn= recipe_arn, \n", " datasetGroupArn = dataset_group_arn)\n", "\n", " solution_arn = create_solution_response['solutionArn']\n", " print('solution_arn: ', solution_arn)\n", " break;\n", " except personalize.exceptions.ClientError as e:\n", " if 'EVENT_INTERACTIONS' not in str(e):\n", " print(json.dumps(create_solution_response, indent=2))\n", " print(e)\n", " break" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create SolutionVersion" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_solution_version_response = personalize.create_solution_version(solutionArn = solution_arn)\n", "\n", "solution_version_arn = create_solution_version_response['solutionVersionArn']\n", "print('solution_version_arn:', solution_version_arn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = None\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " describe_solution_version_response = personalize.describe_solution_version(\n", " solutionVersionArn = solution_version_arn\n", " )\n", " status = describe_solution_version_response[\"solutionVersion\"][\"status\"]\n", " print(\"SolutionVersion: {}\".format(status))\n", " \n", " if status == \"ACTIVE\" or status == \"CREATE FAILED\":\n", " break\n", " \n", " time.sleep(60)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 4. Create a campaign\n", "When creating the campaign, we can set the itemExplorationConfig to configure cold-items exploration weight and also exploration age cut-off. For now, we can set higher explorationWeight as 0.9 and explorationItemAgeCutOff to 7, so we think all the item creation time less then 7 days would be considered as cold item, we would do more exploration on those new items." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Create Campaign" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "create_campaign_response = personalize.create_campaign(\n", " name = prefix + suffix,\n", " solutionVersionArn = solution_version_arn,\n", " minProvisionedTPS = 1,\n", " campaignConfig = {\n", " \"itemExplorationConfig\": {\n", " \"explorationWeight\": \"0.9\",\n", " \"explorationItemAgeCutOff\": \"7\"\n", " }\n", " }\n", ")\n", "\n", "campaign_arn = create_campaign_response['campaignArn']\n", "print('campaign_arn:', campaign_arn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = None\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " describe_campaign_response = personalize.describe_campaign(\n", " campaignArn = campaign_arn\n", " )\n", " status = describe_campaign_response[\"campaign\"][\"status\"]\n", " print(\"Campaign: {}\".format(status))\n", " \n", " if status == \"ACTIVE\" or status == \"CREATE FAILED\":\n", " break\n", " \n", " time.sleep(60)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "describe_campaign_response = personalize.describe_campaign(campaignArn = campaign_arn)\n", "campaign_summary = describe_campaign_response[\"campaign\"]\n", "campaign_summary" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 5. Call GetRecommendations\n", "For the purposes of demo, we'll use the Userids in the input dataset to make getRecommendation calls. \n", "> **_NOTE:_**: In the response, you have a new field `RecommendationId` which correspond to the list of Items returned by Personalize GetRecommendations. You can pass this RecommendationId to indicate the Impressions. \n", "You could also pass Impression as a piped string concatination of items, if you pass both RecommendationId and ImpressionList, ImpressionList would take precedence and used in the system." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "rec_response = personalize_runtime.get_recommendations(campaignArn = campaign_arn, userId = '101')\n", "print(rec_response['recommendationId'])" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rec_response['itemList']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 6. Create Event Tracker\n", "\n", "Creates an event tracker that you use when sending event data to the specified dataset group using the PutEvents API." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": true }, "outputs": [], "source": [ "even_tracker_response = personalize.create_event_tracker( \n", " name=event_tracker_name,\n", " datasetGroupArn=dataset_group_arn\n", ")\n", "event_tracker_arn = even_tracker_response['eventTrackerArn']\n", "event_tracking_id = even_tracker_response['trackingId']\n", "#print(json.dumps(even_tracker_response,indent=2))\n", "print('eventTrackerArn:{},\\n eventTrackingId:{}'.format(event_tracker_arn, event_tracking_id))" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 7. Send Impression data to Personalize via PutEvents.\n", "Amazon Personalize can model two types of impressions: \n", "1. Implicit impressions and explicit impressions. Implicit impressions are impressions that occur during a user's session, and are automatically recorded by Amazon Personalize whenever the user is shown an item. You can integrate them into your recommendation workflow by including the RecommendationId (returned by the and operations) as input for future PutEvents requests. \n", "\n", "\n", "2. Explicit impressions are impressions that you manually input when making a PutEvents request. You would use explicit Impressions when you for example not show some of the items returned by GetRecommendations due to unavailablity, etc., \n", "\n", "> **NOTE:** If you have defined `impression` in your Interaction Schema as above, you need to send the impression list(either the items returned from GetRecommendations or your own). \n", "**When both recommendationId and Impressions are , Amazon Personalize will use the explicit impressions by default.**\n", "\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### Let's put the previously recommmended item as impressions" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "scrolled": false }, "outputs": [], "source": [ "personalize_events.put_events(\n", " trackingId = event_tracking_id,\n", " userId= '101',\n", " sessionId = '1',\n", " eventList = [{\n", " 'sentAt': datetime.now().timestamp(),\n", " 'eventType' : 'click',\n", " 'itemId' : rec_response['itemList'][0]['itemId'], \n", " 'recommendationId': rec_response['recommendationId'],\n", " 'impression': [item['itemId'] for item in rec_response['itemList']],\n", " }]\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### We can also put some new items\n", "\n", "Let's put new itemId '2xx' to the personalize." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "personalize_events.put_events(\n", " trackingId = event_tracking_id,\n", " userId= '101',\n", " sessionId = '1',\n", " eventList = [{\n", " 'sentAt': datetime.now().timestamp(),\n", " 'eventType' : 'click',\n", " 'itemId' : '240',\n", " },\n", " {\n", " 'sentAt': datetime.now().timestamp(),\n", " 'eventType' : 'click',\n", " 'itemId' : '241',\n", " },\n", " {\n", " 'sentAt': datetime.now().timestamp(),\n", " 'eventType' : 'click',\n", " 'itemId' : '242',\n", " },\n", " {\n", " 'sentAt': datetime.now().timestamp(),\n", " 'eventType' : 'click',\n", " 'itemId' : '243',\n", " },\n", " {\n", " 'sentAt': datetime.now().timestamp(),\n", " 'eventType' : 'click',\n", " 'itemId' : '244',\n", " },\n", " {\n", " 'sentAt': datetime.now().timestamp(),\n", " 'eventType' : 'click',\n", " 'itemId' : '245',\n", " },\n", " {\n", " 'sentAt': datetime.now().timestamp(),\n", " 'eventType' : 'click',\n", " 'itemId' : '246',\n", " },\n", " {\n", " 'sentAt': datetime.now().timestamp(),\n", " 'eventType' : 'click',\n", " 'itemId' : '247',\n", " },\n", " {\n", " 'sentAt': datetime.now().timestamp(),\n", " 'eventType' : 'click',\n", " 'itemId' : '248',\n", " },\n", " {\n", " 'sentAt': datetime.now().timestamp(),\n", " 'eventType' : 'click',\n", " 'itemId' : '249',\n", " }]\n", " )" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 8. Create new SolutionVersion with updateMode\n", "\n", "After put-events, Please wait for around 15 minutes for Personalize to ingest the new data, after that, create new solutionVersion with update-mode" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "create_solution_version_response = personalize.create_solution_version(solutionArn = solution_arn, trainingMode = \"UPDATE\")\n", "\n", "solution_version_after_update = create_solution_version_response['solutionVersionArn']\n", "print('solution_version_after_update:', solution_version_arn)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "status = None\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " describe_solution_version_response = personalize.describe_solution_version(\n", " solutionVersionArn = solution_version_after_update\n", " )\n", " status = describe_solution_version_response[\"solutionVersion\"][\"status\"]\n", " print(\"SolutionVersion: {}\".format(status))\n", " \n", " if status == \"ACTIVE\" or status == \"CREATE FAILED\":\n", " break\n", " \n", " time.sleep(60)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### 9. Update Campaign\n", "\n", "Update the campaign with the latest solution-version arn from update." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "campaign_arn_response = personalize.update_campaign(campaignArn=campaign_arn, solutionVersionArn=solution_version_after_update)\n", "print('campaign_arn_response: ', campaign_arn_response)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Wait for campaign update to reflect the new solution-version\n", "solutionVersionArn = None\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " describe_campaign_response = personalize.describe_campaign(\n", " campaignArn = campaign_arn\n", " )\n", " solutionVersionArn = describe_campaign_response[\"campaign\"][\"solutionVersionArn\"]\n", " print(\"Campaign solution version: {}\".format(solutionVersionArn))\n", " \n", " if solutionVersionArn == solution_version_after_update:\n", " break\n", " \n", " time.sleep(60)\n", "\n", "# wait 1 minutes\n", "time.sleep(60)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "desc_campaign_response = personalize.describe_campaign(campaignArn = campaign_arn)['campaign'][\"solutionVersionArn\"]\n", "desc_campaign_response" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### After updated the solution version, let's do recommendation again\n", "\n", "We would expect new items show in the recommendation, since we set high exploration as 0.9" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rec_response = personalize_runtime.get_recommendations(campaignArn = campaign_arn, userId = '101')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rec_response['itemList']" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Update campaign with different explorationWeight\n", "\n", "We would expect more old items show in the recommendation list since we set a low explorationWeight." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "desc_campaign_response = personalize.describe_campaign(campaignArn = campaign_arn)['campaign']\n", "desc_campaign_response" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "campaign_arn_response = personalize.update_campaign(campaignArn=campaign_arn, campaignConfig = {\n", " \"itemExplorationConfig\": {\n", " \"explorationWeight\": \"0.1\",\n", " \"explorationItemAgeCutOff\": \"7\"\n", " }\n", " })\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# Wait for campaign update to reflect the new explorationWeight\n", "explorationWeight = None\n", "max_time = time.time() + 3*60*60 # 3 hours\n", "while time.time() < max_time:\n", " describe_campaign_response = personalize.describe_campaign(\n", " campaignArn = campaign_arn\n", " )\n", " explorationWeight = describe_campaign_response[\"campaign\"][\"campaignConfig\"]['itemExplorationConfig']['explorationWeight']\n", " print(\"Current Campaign explorationWeight: {}\".format(explorationWeight))\n", " \n", " if explorationWeight == \"0.1\":\n", " break\n", " \n", " time.sleep(60)\n", "\n", "# wait 1 minutes\n", "time.sleep(60)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### After updated explorationWeight\n", "\n", "Let's do recommendation again, we should see more old item here." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rec_response = personalize_runtime.get_recommendations(campaignArn = campaign_arn, userId = '101')" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "rec_response" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "#### 10. Delete Resources\n", "\n", "After created all resouces, let's cleanup all resources." ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "personalize.delete_campaign(campaignArn=campaign_arn)\n", "while len(personalize.list_campaigns(solutionArn=solution_arn)['campaigns']):\n", " time.sleep(5)\n", "\n", "personalize.delete_solution(solutionArn=solution_arn)\n", "while len(personalize.list_solutions(datasetGroupArn=dataset_group_arn)['solutions']):\n", " time.sleep(5)\n", "\n", "for dataset in personalize.list_datasets(datasetGroupArn=dataset_group_arn)['datasets']:\n", " personalize.delete_dataset(datasetArn=dataset['datasetArn'])\n", "while len(personalize.list_datasets(datasetGroupArn=dataset_group_arn)['datasets']):\n", " time.sleep(5)\n", " \n", "personalize.delete_event_tracker(eventTrackerArn=event_tracker_arn)\n", "personalize.delete_dataset_group(datasetGroupArn=dataset_group_arn)" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.7" } }, "nbformat": 4, "nbformat_minor": 4 }