# Amazon Personalize - 데이터 준비부터 Campaign 배포 까지

이 노트북은 기본 커널을 conda_python3를 사용합니다.
<br>
처음부터 순차적으로 끝까지 실행하여 Personalize Campaign이 배포 합니다.

## 0. 환경 셋업

필요시 boto3 sdk 업그레이드를 실행 합니다.


### boto3 업그레이드

In [None]:
# !pip install boto3 --upgrade

## 1. 데이터 준비

아래의 Git에서 인공적으로 생성된 데이터 세트를 사용 함.
- 생선데 데이터 세트를 tar 로 압축하여 현재의 git 에 저장 했습니다. 이를 압축해제 해서 사용합니다.

* Retail Demo Store
    * https://github.com/aws-samples/retail-demo-store

In [None]:
import tarfile

tf = tarfile.open("../data/RetailDemoDataSet.tar")
tf.extractall("../data")

In [None]:
import pandas as pd

items = pd.read_csv('../data/items.csv')
users = pd.read_csv('../data/users.csv')
its = pd.read_csv('../data/interactions.csv')

## 2. 데이터 전처리

In [None]:
import boto3
import json
import numpy as np
import pandas as pd
import time
from datetime import datetime

import matplotlib.pyplot as plt

### Items 컬럼 조정

In [None]:
items.columns

In [None]:
def item_data_selection(df, cols):
    ldf = df[cols]
    ldf = ldf.rename(columns={'id':'ITEM_ID',
                              'name' : 'NAME',
                              'category' :'CATEGORY_L1',
                              'style' : 'STYLE',
                              'description' : 'PRODUCT_DESCRIPTION',
                              'price' : 'PRICE',
                             })
    return ldf


item_cols = ['id', 'name', 'category', 'style', 'description','price']
items_df = item_data_selection(items, item_cols)    

items_df.head(3)

### Users 컬럼 조정

In [None]:
users.columns

In [None]:
def user_data_selection(df, cols):
    ldf = df[cols]
    ldf = ldf.rename(columns={'id':'USER_ID',
                              'username' : 'USER_NAME',
                              'age' :'AGE',
                              'gender' : 'GENDER',                              
                             })
    return ldf

user_cols = ['id', 'username', 'age', 'gender']

users_df = user_data_selection(users, user_cols)    
users_df.head(3)

### Users 데이터 타입 변경

In [None]:
users_df.info()

In [None]:
def change_data_type(df, col, target_type):
    ldf = df.copy()
    ldf[col] = ldf[col].astype(target_type)
    
    return ldf

users_df = change_data_type(users_df, col='USER_ID', target_type='object')
users_df.info()

### Interactions 컬럼 조정

In [None]:
its.columns

In [None]:
def interactions_data_selection(df, cols):
    ldf = df[cols]
    ldf = ldf.rename(columns={'id':'USER_ID',
                              'username' : 'USER_NAME',
                              'age' :'AGE',
                              'gender' : 'GENDER',                              
                             })
    return ldf

interactions_cols = ['ITEM_ID', 'USER_ID', 'EVENT_TYPE', 'TIMESTAMP']

full_interactions_df = interactions_data_selection(its, interactions_cols)    
full_interactions_df.head(3)

### Interaction의 EVENT_TYPE 선택

여기서 EVENT_TYPE이 ProductViewd, OrderCompleted 만 선택하고 이름을 각각 View, Purchase 로 변경

In [None]:
full_interactions_df.EVENT_TYPE.value_counts()

In [None]:
def filter_interactions_data(df, kinds_event_type):
    ldf = df[df['EVENT_TYPE'].isin(kinds_event_type)]
    ldf['EVENT_TYPE'] = ldf['EVENT_TYPE'].replace(['ProductViewed'],'View')    
    ldf['EVENT_TYPE'] = ldf['EVENT_TYPE'].replace(['OrderCompleted'],'Purchase')        
    
    return ldf

select_event_types = ['ProductViewed','OrderCompleted']
interactions_df = filter_interactions_data(full_interactions_df, select_event_types)
interactions_df

### Interaction 데이터 타입 변경

In [None]:
interactions_df.info()

In [None]:
interactions_df = change_data_type(interactions_df, col='USER_ID', target_type='object')
interactions_df.info()

## 3. 데이터셋을 S3로 업로드

In [None]:
import sagemaker
bucket='<YOUR BUCKET NAME>' # replace with the name of your S3 bucket
bucket

In [None]:
import os
os.makedirs('dataset', exist_ok=True)

In [None]:
items_filename = "dataset/training_item.csv"
users_filename = "dataset/training_user.csv"
its_filename = "dataset/training_interaction.csv"

items_df.to_csv(items_filename,index=False)
users_df.to_csv(users_filename,index=False)
interactions_df.to_csv(its_filename,index=False)

In [None]:
#upload file for training
response_upload = boto3.Session().resource('s3').Bucket(bucket).Object(its_filename).upload_file(its_filename)
boto3.Session().resource('s3').Bucket(bucket).Object(users_filename).upload_file(users_filename)
boto3.Session().resource('s3').Bucket(bucket).Object(items_filename).upload_file(items_filename)

s3_its_filename = "s3://{}/{}".format(bucket, its_filename)
s3_users_filename = "s3://{}/{}".format(bucket, users_filename)
s3_items_filename = "s3://{}/{}".format(bucket, items_filename)

print("s3_train_interaction_filename: \n", s3_its_filename)
print("s3_train_users_filename: \n", s3_users_filename)
print("s3_train_items_filename: \n", s3_items_filename)


In [None]:
! aws s3 ls {s3_its_filename} --recursive
! aws s3 ls {s3_users_filename} --recursive
! aws s3 ls {s3_items_filename} --recursive

## 4. Personalize : Create Dataset Group

In [None]:
import boto3
import json
import time
from datetime import datetime

# Configure the SDK to Personalize:
personalize = boto3.client('personalize')

### Personalize 서비스의 S3 접근 권한을 위한 IAM Role 생성

In [None]:
s3 = boto3.client("s3")

policy = {
    "Version": "2012-10-17",
    "Id": "PersonalizeS3BucketAccessPolicy",
    "Statement": [
        {
            "Sid": "PersonalizeS3BucketAccessPolicy",
            "Effect": "Allow",
            "Principal": {
                "Service": "personalize.amazonaws.com"
            },
            "Action": [
                "s3:*",
            ],
            "Resource": [
                "arn:aws:s3:::{}".format(bucket),
                "arn:aws:s3:::{}/*".format(bucket)
            ]
        }
    ]
}

s3.put_bucket_policy(Bucket=bucket, Policy=json.dumps(policy))

In [None]:
suffix = str(np.random.uniform())[4:9]

In [None]:
iam = boto3.client("iam")

# Personalize 서비스가 이용할 role을 만들기 위한 assume_role_policy 생성
role_name = "PersonalizeRoleDemo" + suffix
assume_role_policy_document = {
    "Version": "2012-10-17",
    "Statement": [
        {
          "Effect": "Allow",
          "Principal": {
            "Service": "personalize.amazonaws.com"
          },
          "Action": "sts:AssumeRole"
        }
    ]
}

# Personalize 서비스가 이용할 role 생성
create_role_response = iam.create_role(
    RoleName = role_name,
    AssumeRolePolicyDocument = json.dumps(assume_role_policy_document)
)

# 위에서 생성한 role에 AmazonPersonalizeFullAccess 권한 추가
policy_arn = "arn:aws:iam::aws:policy/service-role/AmazonPersonalizeFullAccess"
iam.attach_role_policy(
    RoleName = role_name,
    PolicyArn = policy_arn
)

# 위에서 생성한 role에 AmazonS3FullAccess 권한 추가
iam.attach_role_policy(
    RoleName=role_name,    
    PolicyArn='arn:aws:iam::aws:policy/AmazonS3FullAccess'
)
time.sleep(15) # wait for 15 seconds to allow IAM role policy attachment to propagate

role_arn = create_role_response["Role"]["Arn"]
print(role_arn)

### Dataset Group 생성

In [None]:
create_dataset_group_response = personalize.create_dataset_group(
    name = "RetailDemo-dataset-group" + suffix
)

dataset_group_arn = create_dataset_group_response['datasetGroupArn']
dataset_group_arn

#### Dataset Group이 Active 상태가 될 때까지 대기
Dataset Group 생성은 보통 30초 이내에 Active 상태가 됩니다.

In [None]:
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_group_response = personalize.describe_dataset_group(
        datasetGroupArn = dataset_group_arn
    )
    status = describe_dataset_group_response["datasetGroup"]["status"]
    print("DatasetGroup: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(15)

### 스키마 생성

#### Interaction

In [None]:
interaction_schema_name="RetailDemo-interaction-schema" + suffix

schema = {
    "type": "record",
    "name": "Interactions",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
        {
            "name": "USER_ID",
            "type": "string"
        },
        {
            "name": "ITEM_ID",
            "type": "string"
        },
        { 
            "name": "EVENT_TYPE",
            "type": "string"
        },        
        {
            "name": "TIMESTAMP",
            "type": "long"
        }
    ],
    "version": "1.0"
}


create_schema_response = personalize.create_schema( 
    name = interaction_schema_name,
    schema = json.dumps(schema)
)

interaction_schema_arn = create_schema_response['schemaArn']
print(json.dumps(create_schema_response, indent=2))

#### Item

In [None]:
item_schema_name="RetailDemo-item-schema" + suffix

schema = {
    "type": "record",
    "name": "Items",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
    {
        "name": "ITEM_ID",
        "type": "string"
    },
    {
        "name": "NAME",
        "type": "string"
    },
    {
      "name": "CATEGORY_L1",
      "type": [
        "string"
      ],
      "categorical": True
    },
    {
      "name": "STYLE",
      "type": [
        "string"
      ],
      "categorical": True
    },
    {
        "name": "PRODUCT_DESCRIPTION",
        "type": "string"
    },
    {
      "name": "PRICE",
      "type": "float"
    },    
    ],
    "version": "1.0"
}

create_metadata_schema_response = personalize.create_schema(      
    name = item_schema_name,
    schema = json.dumps(schema)
)

item_schema_arn = create_metadata_schema_response['schemaArn']
print(json.dumps(create_metadata_schema_response, indent=2))

#### User

In [None]:
user_schema_name="RetailDemo-user-schema" + suffix

schema = {
    "type": "record",
    "name": "Users",
    "namespace": "com.amazonaws.personalize.schema",
    "fields": [
    {
        "name": "USER_ID",
        "type": "string"
    },
    {
      "name": "USER_NAME",
      "type": "string"
    },        
    {
      "name": "GENDER",
      "type": [
        "string"
      ],
      "categorical": True
    }        
    ],
    "version": "1.0"
}

create_metadata_schema_response = personalize.create_schema(      
    name = user_schema_name,
    schema = json.dumps(schema)
)

user_schema_arn = create_metadata_schema_response['schemaArn']
print(json.dumps(create_metadata_schema_response, indent=2))

## 5. Personalize : Create Dataset

#### Interaction

In [None]:
dataset_type = "INTERACTIONS"
create_dataset_response = personalize.create_dataset(
    name = "RetailDemo-interaction-dataset" + suffix,
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = interaction_schema_arn
)

interaction_dataset_arn = create_dataset_response['datasetArn']
print(json.dumps(create_dataset_response, indent=2))

#### Item

In [None]:
dataset_type = "ITEMS"
create_item_dataset_response = personalize.create_dataset(
    name = "RetailDemo-item-dataset" + suffix,
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = item_schema_arn,
  
)

item_dataset_arn = create_item_dataset_response['datasetArn']
print(json.dumps(create_item_dataset_response, indent=2))

#### User

In [None]:
dataset_type = "USERS"
create_user_dataset_response = personalize.create_dataset(
    name = "RetailDemo-user-dataset" + suffix,
    datasetType = dataset_type,
    datasetGroupArn = dataset_group_arn,
    schemaArn = user_schema_arn,
  
)

user_dataset_arn = create_user_dataset_response['datasetArn']
print(json.dumps(create_user_dataset_response, indent=2))

#### Dataset 생성을 위해 1분 대기

In [None]:
time.sleep(60)

## 6. Personalize : Import Dataset 

#### Interaction Dataset - Create Import Job

In [None]:
create_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "RetailDeom-interaction-dataset-import" + suffix,
    datasetArn = interaction_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, its_filename)
    },
    roleArn = role_arn
)

interation_dataset_import_job_arn = create_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_dataset_import_job_response, indent=2))

#### Item Dataset - Create Import Job

In [None]:
create_item_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "RetailDemo-item-dataset-import" + suffix,
    datasetArn = item_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, items_filename)
    },
    roleArn = role_arn
)

item_dataset_import_job_arn = create_item_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_item_dataset_import_job_response, indent=2))

#### User Dataset - Create Import Job

In [None]:
create_user_dataset_import_job_response = personalize.create_dataset_import_job(
    jobName = "RetailDemo-user-dataset-import" + suffix,
    datasetArn = user_dataset_arn,
    dataSource = {
        "dataLocation": "s3://{}/{}".format(bucket, users_filename)
    },
    roleArn = role_arn
)

user_dataset_import_job_arn = create_user_dataset_import_job_response['datasetImportJobArn']
print(json.dumps(create_user_dataset_import_job_response, indent=2))

#### 모든 Dataset의 Import 가 완료되어야 다음 스텝인 Training을 진행할 수 있습니다.
#### 따라서 아래 3개의 데이터셋이 모두 ACTIVE 상태가 될 떄까지 대기 합니다.

#### Interaction

In [None]:
%%time

status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = interation_dataset_import_job_arn
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(15)

#### Item

In [None]:
status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = item_dataset_import_job_arn
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(15)

#### User

In [None]:
status = None
max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:
    describe_dataset_import_job_response = personalize.describe_dataset_import_job(
        datasetImportJobArn = user_dataset_import_job_arn
    )
    
    dataset_import_job = describe_dataset_import_job_response["datasetImportJob"]
    if "latestDatasetImportJobRun" not in dataset_import_job:
        status = dataset_import_job["status"]
        print("DatasetImportJob: {}".format(status))
    else:
        status = dataset_import_job["latestDatasetImportJobRun"]["status"]
        print("LatestDatasetImportJobRun: {}".format(status))
    
    if status == "ACTIVE" or status == "CREATE FAILED":
        break
        
    time.sleep(15)

## 7. Personalize : Create Solution

### Create Solution - "AWS-USER-PERSONALIZATION"

In [None]:
# Define the solution details
solution_name = "RetailDemo-user-personalization"
recipe_arn = "arn:aws:personalize:::recipe/aws-user-personalization"
perform_hpo = False # set to true if you want to perform hyperparameter optimization

# Create the solution
create_solution_response = personalize.create_solution(
    name=solution_name,
    recipeArn=recipe_arn,
    performHPO=perform_hpo,
    datasetGroupArn = dataset_group_arn,
    solutionConfig = {
        "algorithmHyperParameters": {
            "bptt": "32",
            "hidden_dimension": "149",
            "recency_mask": "true"
        },
        "featureTransformationParameters": {
            "max_user_history_length_percentile": "0.99",
            "min_user_history_length_percentile": "0.00"
        }
    }
)

# Get the solution ARN
solution_arn = create_solution_response['solutionArn']
print(f'Solution ARN: {solution_arn}')

### Create Solution Version

In [None]:
# Create the solution version
create_solution_version_response = personalize.create_solution_version(
    solutionArn=solution_arn
)

# Get the solution version ARN
solution_version_arn = create_solution_version_response['solutionVersionArn']
print(f'Solution version ARN: {solution_version_arn}')

#### Solution Version가 ACTIVE (완료) 상태일 때까지 대기
약 20~30분 정도 소요 됩니다.


In [None]:
%%time

max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:

    # status_aws_user_personalization
    describe_solution_response = personalize.describe_solution_version(
        solutionVersionArn = solution_version_arn
    )  
    status_solution = describe_solution_response['solutionVersion']["status"]
    print("status_user-personalization : {}".format(status_solution))
    
        
    if (status_solution == "ACTIVE" or status_solution == "CREATE FAILED") :
        break
    print("-------------------------------------->")
    time.sleep(30)

print("Generating solution version is completed")

## 8. Personalize : Create Campaign

In [None]:
create_campaign_reponse = personalize.create_campaign(
    name = 'RetailDemo-campaign' + suffix,
    solutionVersionArn = solution_version_arn,
    minProvisionedTPS=1
)

campaign_arn = create_campaign_reponse['campaignArn']


#### Campaign 생성 완료까지 대기
약 7분 정도 소요 됩니다.

In [None]:
%%time

max_time = time.time() + 3*60*60 # 3 hours
while time.time() < max_time:

    # status_aws_user_personalization
    describe_campaign_response = personalize.describe_campaign(
        campaignArn = campaign_arn
    )  
    status_campaign = describe_campaign_response['campaign']["status"]
    print("status_creating_campaign : {}".format(status_campaign))
    
        
    if (status_campaign == "ACTIVE" or status_campaign == "CREATE FAILED") :
        break
    print("-------------------------------------->")
    time.sleep(60)

print("Creating Campaign is completed")

#### 변수 저장
clean-up을 위해 필요한 변수 저장

In [None]:
%store dataset_group_arn
%store interaction_schema_arn
%store item_schema_arn
%store user_schema_arn
%store interaction_dataset_arn
%store item_dataset_arn
%store user_dataset_arn
%store solution_arn
%store campaign_arn


# 아래의 Personalize Campaign ARN으로 추론을 합니다.
Lambda Function에서 Personalize Campaign은 아래 Personalize Campaign ARN을 사용합니다.

In [None]:
print("Personalize Campaign ARN : ", campaign_arn)