# Amazon Personalize - From Data Preparation to Campaign Deployment

This notebook uses <b>`conda_python3`</b> as the default kernel.
<br>
Deploy Personalize Campaign by running cells sequentially from start to finish.

## 0. Setting Environment

<i>(Optional)</i> Run boto3 sdk upgrade if needed.


### boto3 Upgrade (Optional)

In [None]:
# !pip install boto3 --upgrade

## 1. Data Preparation

We use the dataset from the <b>Retail Demo Store</b> below.
- It is used by unpacking the tar archive.

* Retail Demo Store
    * https://github.com/aws-samples/retail-demo-store

In [None]:
import tarfile

tf = tarfile.open("../data/RetailDemoDataSet.tar")
tf.extractall("../data")

In [None]:
import pandas as pd

items = pd.read_csv('../data/items.csv')
users = pd.read_csv('../data/users.csv')
its = pd.read_csv('../data/interactions.csv')

## 2. Data Preprocessing

In [None]:
import boto3
import json
import numpy as np
import pandas as pd
import time
from datetime import datetime

import matplotlib.pyplot as plt

### Edit columns of <b>ITEMS</b> dataset

In [None]:
items.columns

In [None]:
def item_data_selection(df, cols):
    ldf = df[cols]
    ldf = ldf.rename(columns={'id':'ITEM_ID',
                              'name' : 'NAME',
                              'category' :'CATEGORY_L1',
                              'style' : 'STYLE',
                              'description' : 'PRODUCT_DESCRIPTION',
                              'price' : 'PRICE',
                             })
    return ldf


item_cols = ['id', 'name', 'category', 'style', 'description','price']
items_df = item_data_selection(items, item_cols)    

items_df.head(3)

### Edit columns of <b>USERS</b> dataset

In [None]:
users.columns

In [None]:
def user_data_selection(df, cols):
    ldf = df[cols]
    ldf = ldf.rename(columns={'id':'USER_ID',
                              'username' : 'USER_NAME',
                              'age' :'AGE',
                              'gender' : 'GENDER',                              
                             })
    return ldf

user_cols = ['id', 'username', 'age', 'gender']

users_df = user_data_selection(users, user_cols)    
users_df.head(3)

### Modify data type of <b>ITEMS</b> dataset

In [None]:
users_df.info()

In [None]:
def change_data_type(df, col, target_type):
    ldf = df.copy()
    ldf[col] = ldf[col].astype(target_type)
    
    return ldf

users_df = change_data_type(users_df, col='USER_ID', target_type='object')
users_df.info()

### Edit columns of <b>INTERACTIONS</b> dataset

In [None]:
its.columns

In [None]:
def interactions_data_selection(df, cols):
    ldf = df[cols]
    ldf = ldf.rename(columns={'id':'USER_ID',
                              'username' : 'USER_NAME',
                              'age' :'AGE',
                              'gender' : 'GENDER',                              
                             })
    return ldf

interactions_cols = ['ITEM_ID', 'USER_ID', 'EVENT_TYPE', 'TIMESTAMP']

full_interactions_df = interactions_data_selection(its, interactions_cols)    
full_interactions_df.head(3)

### Edit EVENT_TYPE column of <b>INTERACTIONS</b> dataset 

Select only <b>ProductViewd</b> and <b>OrderCompleted</b> for EVENT_TYPE and change the names to `View` and `Purchase` respectively.

In [None]:
full_interactions_df.EVENT_TYPE.value_counts()

In [None]:
def filter_interactions_data(df, kinds_event_type):
    ldf = df[df['EVENT_TYPE'].isin(kinds_event_type)]
    ldf['EVENT_TYPE'] = ldf['EVENT_TYPE'].replace(['ProductViewed'],'View')    
    ldf['EVENT_TYPE'] = ldf['EVENT_TYPE'].replace(['OrderCompleted'],'Purchase')        
    
    return ldf

select_event_types = ['ProductViewed','OrderCompleted']
interactions_df = filter_interactions_data(full_interactions_df, select_event_types)
interactions_df

### Edit columns of <b>INTERACTIONS</b> dataset

In [None]:
interactions_df.info()

In [None]:
interactions_df = change_data_type(interactions_df, col='USER_ID', target_type='object')
interactions_df.info()

## 3. Upload the dataset to S3

In [None]:
import sagemaker

bucket='<YOUR BUCKET NAME>' # replace with the name of your S3 bucket
bucket

In [None]:
import os
os.makedirs('dataset', exist_ok=True)

In [None]:
items_filename = "dataset/training_item.csv"
users_filename = "dataset/training_user.csv"
its_filename = "dataset/training_interaction.csv"

items_df.to_csv(items_filename,index=False)
users_df.to_csv(users_filename,index=False)
interactions_df.to_csv(its_filename,index=False)

In [None]:
#upload file for training
response_upload = boto3.Session().resource('s3').Bucket(bucket).Object(its_filename).upload_file(its_filename)
boto3.Session().resource('s3').Bucket(bucket).Object(users_filename).upload_file(users_filename)
boto3.Session().resource('s3').Bucket(bucket).Object(items_filename).upload_file(items_filename)

s3_its_filename = "s3://{}/{}".format(bucket, its_filename)
s3_users_filename = "s3://{}/{}".format(bucket, users_filename)
s3_items_filename = "s3://{}/{}".format(bucket, items_filename)

print("s3_train_interaction_filename: \n", s3_its_filename)
print("s3_train_users_filename: \n", s3_users_filename)
print("s3_train_items_filename: \n", s3_items_filename)


In [None]:
! aws s3 ls {s3_its_filename} --recursive
! aws s3 ls {s3_users_filename} --recursive
! aws s3 ls {s3_items_filename} --recursive

## 4. Personalize : Create Dataset Group

In [None]:
import boto3
import json
import time
from datetime import datetime

# Configure the SDK to Personalize:
personalize = boto3.client('personalize')

### Creating an IAM Role to access S3 for Personalize 

In [None]:
s3 = boto3.client("s3")

policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:*",
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))

In [None]:
suffix = str(np.random.uniform())[4:9]

In [None]:
iam = boto3.client("iam")

# Create assume_role_policy to create a role that Personalize will use
role_name = "PersonalizeRoleDemo" + suffix
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}

# Create a role to be used by Personalize
create_role_response = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)

# Add AmazonPersonalizeFullAccess permission to the role created above
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = policy_arn
)

# Add AmazonS3FullAccess permission to the role created above
iam.attach_role_policy(
    RoleName=role_name,    
    PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess'
)
time.sleep(15) # wait for 15 seconds to allow IAM role policy attachment to propagate

role_arn = create_role_response["Role"]["Arn"]
print(role_arn)

### Create Dataset Group

In [None]:
create_dataset_group_response = personalize.create_dataset_group(
    name = "RetailDemo-dataset-group" + suffix
)

dataset_group_arn = create_dataset_group_response['datasetGroupArn']
dataset_group_arn

#### Waiting for Dataset Group to become <b>Active</b>
Dataset Group creation usually becomes active within 30 seconds.

In [None]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn = dataset_group_arn
    )
    status = describe_dataset_group_response["datasetGroup"]["status"]
    print("DatasetGroup: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(15)

### Create Schema

#### for INTERACTIONS

In [None]:
interaction_schema_name="RetailDemo-interaction-schema" + suffix

schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        { 
            "name": "EVENT_TYPE",
            "type": "string"
        },        
        {
            "name": "TIMESTAMP",
            "type": "long"
        }
    ],
    "version": "1.0"
}


create_schema_response = personalize.create_schema( 
    name = interaction_schema_name,
    schema = json.dumps(schema)
)

interaction_schema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))

#### for ITEMS

In [None]:
item_schema_name="RetailDemo-item-schema" + suffix

schema = {
    "type": "record",
    "name": "Items",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
    {
        "name": "ITEM_ID",
        "type": "string"
    },
    {
        "name": "NAME",
        "type": "string"
    },
    {
      "name": "CATEGORY_L1",
      "type": [
        "string"
      ],
      "categorical": True
    },
    {
      "name": "STYLE",
      "type": [
        "string"
      ],
      "categorical": True
    },
    {
        "name": "PRODUCT_DESCRIPTION",
        "type": "string"
    },
    {
      "name": "PRICE",
      "type": "float"
    },    
    ],
    "version": "1.0"
}

create_metadata_schema_response = personalize.create_schema(      
    name = item_schema_name,
    schema = json.dumps(schema)
)

item_schema_arn = create_metadata_schema_response['schemaArn']
print(json.dumps(create_metadata_schema_response, indent=2))

#### for USERS

In [None]:
user_schema_name="RetailDemo-user-schema" + suffix

schema = {
    "type": "record",
    "name": "Users",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
    {
        "name": "USER_ID",
        "type": "string"
    },
    {
      "name": "USER_NAME",
      "type": "string"
    },        
    {
      "name": "GENDER",
      "type": [
        "string"
      ],
      "categorical": True
    }        
    ],
    "version": "1.0"
}

create_metadata_schema_response = personalize.create_schema(      
    name = user_schema_name,
    schema = json.dumps(schema)
)

user_schema_arn = create_metadata_schema_response['schemaArn']
print(json.dumps(create_metadata_schema_response, indent=2))

## 5. Personalize : Create Dataset

#### for INTERACTIONS

In [None]:
dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
    name = "RetailDemo-interaction-dataset" + suffix,
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = interaction_schema_arn
)

interaction_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

#### for ITEMS

In [None]:
dataset_type = "ITEMS"
create_item_dataset_response = personalize.create_dataset(
    name = "RetailDemo-item-dataset" + suffix,
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = item_schema_arn,
  
)

item_dataset_arn = create_item_dataset_response['datasetArn']
print(json.dumps(create_item_dataset_response, indent=2))

#### for USERS

In [None]:
dataset_type = "USERS"
create_user_dataset_response = personalize.create_dataset(
    name = "RetailDemo-user-dataset" + suffix,
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = user_schema_arn,
  
)

user_dataset_arn = create_user_dataset_response['datasetArn']
print(json.dumps(create_user_dataset_response, indent=2))

#### wait for 1 minute(or less) until Dataset creation is complete

In [None]:
time.sleep(60)

## 6. Personalize : Import Dataset 

#### INTERACTIONS Dataset - Create Import Job

In [None]:
create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "RetailDeom-interaction-dataset-import" + suffix,
    datasetArn = interaction_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, its_filename)
    },
    roleArn = role_arn
)

interation_dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

#### ITEMS Dataset - Create Import Job

In [None]:
create_item_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "RetailDemo-item-dataset-import" + suffix,
    datasetArn = item_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, items_filename)
    },
    roleArn = role_arn
)

item_dataset_import_job_arn = create_item_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_item_dataset_import_job_response, indent=2))

#### USERS Dataset - Create Import Job

In [None]:
create_user_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "RetailDemo-user-dataset-import" + suffix,
    datasetArn = user_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, users_filename)
    },
    roleArn = role_arn
)

user_dataset_import_job_arn = create_user_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_user_dataset_import_job_response, indent=2))

#### All Dataset Import tasks must be completed before proceeding with the next step.
#### Therefore, it waits until all three datasets below become ACTIVE.

#### import job status of INTERACTIONS

In [None]:
%%time

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = interation_dataset_import_job_arn
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(15)

#### import job status of ITEMS

In [None]:
status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = item_dataset_import_job_arn
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(15)

#### import job status of USERS

In [None]:
status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = user_dataset_import_job_arn
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(15)

## 7. Personalize : Create Solution

### Create Solution with <b>"AWS-USER-PERSONALIZATION"</b> recipe

In [None]:
# Define the solution details
solution_name = "RetailDemo-user-personalization"
recipe_arn = "arn:aws:personalize:::recipe/aws-user-personalization"
perform_hpo = False # set to true if you want to perform hyperparameter optimization

# Create the solution
create_solution_response = personalize.create_solution(
    name=solution_name,
    recipeArn=recipe_arn,
    performHPO=perform_hpo,
    datasetGroupArn = dataset_group_arn,
    solutionConfig = {
        "algorithmHyperParameters": {
            "bptt": "32",
            "hidden_dimension": "149",
            "recency_mask": "true"
        },
        "featureTransformationParameters": {
            "max_user_history_length_percentile": "0.99",
            "min_user_history_length_percentile": "0.00"
        }
    }
)

# Get the solution ARN
solution_arn = create_solution_response['solutionArn']
print(f'Solution ARN: {solution_arn}')

### Create Solution Version

In [None]:
# Create the solution version
create_solution_version_response = personalize.create_solution_version(
    solutionArn=solution_arn
)

# Get the solution version ARN
solution_version_arn = create_solution_version_response['solutionVersionArn']
print(f'Solution version ARN: {solution_version_arn}')

#### Wait until Solution Version is in ACTIVE state
It takes about 20-30 minutes.


In [None]:
%%time

max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:

    # status_aws_user_personalization
    describe_solution_response = personalize.describe_solution_version(
        solutionVersionArn = solution_version_arn
    )  
    status_solution = describe_solution_response['solutionVersion']["status"]
    print("status_user-personalization : {}".format(status_solution))
    
        
    if (status_solution == "ACTIVE" or status_solution == "CREATE FAILED") :
        break
    print("-------------------------------------->")
    time.sleep(30)

print("Generating solution version is completed")

## 8. Personalize : Create Campaign

In [None]:
create_campaign_reponse = personalize.create_campaign(
    name = 'RetailDemo-campaign' + suffix,
    solutionVersionArn = solution_version_arn,
    minProvisionedTPS=1
)

campaign_arn = create_campaign_reponse['campaignArn']


#### Wait for Campaign creation to complete
It takes about 7 minutes.

In [None]:
%%time

max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:

    # status_aws_user_personalization
    describe_campaign_response = personalize.describe_campaign(
        campaignArn = campaign_arn
    )  
    status_campaign = describe_campaign_response['campaign']["status"]
    print("status_creating_campaign : {}".format(status_campaign))
    
        
    if (status_campaign == "ACTIVE" or status_campaign == "CREATE FAILED") :
        break
    print("-------------------------------------->")
    time.sleep(60)

print("Creating Campaign is completed")

#### save variable
Save variables needed for clean-up

In [None]:
%store dataset_group_arn
%store interaction_schema_arn
%store item_schema_arn
%store user_schema_arn
%store interaction_dataset_arn
%store item_dataset_arn
%store user_dataset_arn
%store solution_arn
%store campaign_arn


# You can make an inference request with the Personalize Campaign ARN below.
In the Lambda Function, Personalize Campaign uses the Personalize Campaign ARN below.

In [None]:
print("Personalize Campaign ARN : ", campaign_arn)