In [None]:
!pip install --upgrade sagemaker

In [None]:
import sagemaker
import boto3
import sys
import io
import pandas as pd
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.metrics import accuracy_score

from sagemaker import get_execution_role

In [None]:
prefix = "redshift-featurestore-blog"
role = get_execution_role()

sagemaker_session = sagemaker.Session()
region = sagemaker_session.boto_region_name
s3_bucket_name = sagemaker_session.default_bucket()

print(f"Region is {region}")
print(f"IAM role is {role}")

In [None]:
sagemaker_session.boto_session.client("sagemaker", region_name=region).list_feature_groups()

In [None]:
# In order to avoid duplicates for 4a/4b experiements, we define 2 series of feature groups for each trail
# In the previous step, if you tried 4a with Glue interactive sessions
# feature_group_prefix = 'redshift-sm-demo-4a-'
# If you tried 4b with SageMaker Processing Job in previous step
feature_group_prefix = 'redshift-sm-demo-4b-'

def get_table_name(feature_group_name):
    featurestore_table = sagemaker_session.describe_feature_group(feature_group_name)['OfflineStoreConfig']['DataCatalogConfig']['TableName']
    return featurestore_table

dim_user_feature_group_name = feature_group_prefix + 'users'
dim_place_feature_group_name = feature_group_prefix + 'places'
fact_rating_feature_group_name = feature_group_prefix + 'ratings'
print(f"users_table : {dim_user_feature_group_name} \nplaces_table : {dim_place_feature_group_name} \nratings_table : {fact_rating_feature_group_name} ")

In [None]:
from sagemaker.feature_store.feature_group import FeatureGroup

dim_user_feature_group = FeatureGroup(name=dim_user_feature_group_name, sagemaker_session=sagemaker_session)
dim_place_feature_group = FeatureGroup(name=dim_place_feature_group_name, sagemaker_session=sagemaker_session)
fact_rating_feature_group = FeatureGroup(name=fact_rating_feature_group_name, sagemaker_session=sagemaker_session)

In [None]:
# dim_user_feature_group.delete()
# dim_place_feature_group.delete()
# fact_rating_feature_group.delete()

In [None]:
# dim_user_feature_group.describe()
# dim_place_feature_group.describe()
# fact_rating_feature_group.describe()

## Build training dataset

Option 1: Using Feature Store API DatasetBuilder

In [None]:
from sagemaker.feature_store.dataset_builder import DatasetBuilder

fact_rating_dataset = DatasetBuilder(
    sagemaker_session = sagemaker_session, 
    base = fact_rating_feature_group,
    output_path = f"s3://{s3_bucket_name}/{prefix}",
    record_identifier_feature_name = 'ratingid',
    event_time_identifier_feature_name = 'timestamp', 
).to_dataframe()[0]

fact_rating_dataset.head()

In [None]:
fact_rating_dataset = fact_rating_dataset.drop(columns=['ratingid', 'timestamp'])

In [None]:
dim_place_dataset = DatasetBuilder(
    sagemaker_session = sagemaker_session, 
    base = dim_place_feature_group,
    output_path = f"s3://{s3_bucket_name}/{prefix}",
    record_identifier_feature_name = 'placeid',
    event_time_identifier_feature_name = 'timestamp', 
).to_dataframe()[0]

dim_place_dataset.head()

In [None]:
dim_place_dataset = dim_place_dataset.drop(columns='timestamp')

Option 2: Utilizing the auto-built Glue Data Catalog for FeatureGroup

In [None]:
dim_user_query = dim_user_feature_group.athena_query()
dim_user_table = dim_user_query.table_name

dim_user_query_string = (
    'SELECT * FROM "'
    + dim_user_table
    + '"'
)
print(dim_user_query_string)

In [None]:
dim_user_query.run(
    query_string = dim_user_query_string,
    output_location = f"s3://{s3_bucket_name}/{prefix}",
)

dim_user_query.wait()
dim_user_dataset = dim_user_query.as_dataframe()

dim_user_dataset.head()

In [None]:
dim_user_dataset = dim_user_dataset.drop(columns = ["timestamp", "write_time", "api_invocation_time", "is_deleted"])

In [None]:
# Merge selected columns from three datasets
final_data = pd.merge(pd.merge(fact_rating_dataset, dim_user_dataset, on='userid'), dim_place_dataset, on='placeid')
final_data.head()

In [None]:
final_data.shape

In [None]:
# Check data for any nulls
final_data.isnull().values.any()

In [None]:
# True/False -> 1/0
final_data['user_smoker'] = final_data['user_smoker'].astype(int)

In [None]:
# Move rating_overall (label) to the first column
first_column = final_data.pop('rating_overall')
final_data.insert(0, 'rating_overall', first_column)

In [None]:
pd.set_option('display.max_columns', None)
final_data.head()

In [None]:
# Split label column and remove header
feature_id_col = final_data.columns[1:]
label_col = final_data.columns[0]

features_ids = final_data[feature_id_col].values
labels = final_data[label_col].values.astype('float32')

In [None]:
# Split some samples for final testing
X_train_all, X_test, y_train_all, y_test = train_test_split(features_ids, labels, test_size=0.1, stratify=labels)

In [None]:
# Drop userID and placeID columns in training set
X_train_all = X_train_all[:, 2:].astype('float32')

In [None]:
# Split train & validation dataset
X_train, X_validation, y_train, y_validation = train_test_split(X_train_all, y_train_all, test_size=0.2, stratify=y_train_all)

In [None]:
# Prepare data in csv
train_concate = np.concatenate((y_train.reshape(len(y_train),1), X_train), axis=1)
train_data = pd.DataFrame(train_concate, index=None, columns=None)

validation_concate = np.concatenate((y_validation.reshape(len(y_validation),1), X_validation), axis=1)
validation_data = pd.DataFrame(validation_concate, index=None, columns=None)

test_concate = np.concatenate((y_test.reshape(len(y_test),1), X_test), axis=1)
test_data = pd.DataFrame(test_concate, index=None, columns=None)

In [None]:
train_data.to_csv('train_data.csv', header=False, index=False)
validation_data.to_csv('validation_data.csv', header=False, index=False)
# test_data.to_csv('test_data.csv', header=False, index=False)

In [None]:
# Upload training and validation data to s3
sagemaker.Session().upload_data('train_data.csv', bucket=s3_bucket_name, key_prefix=prefix+'/train')
sagemaker.Session().upload_data('validation_data.csv', bucket=s3_bucket_name, key_prefix=prefix+'/validation')

## Model training

In [None]:
# Train XgBoost
container = sagemaker.image_uris.retrieve("xgboost", region, "1.5-1")

hyperparameters = {
    "num_class": "3",
    "max_depth": "9", # 10
    "eta": "0.2",
    "gamma": "4",
    "min_child_weight": "1.3",  # 1.6
    "subsample": "0.7",
    "objective": "multi:softmax",
    "num_round": "254",  # 266
    "verbosity": "2",
    "alpha": "0.07",  # 0.03
    
}

output_path = f"s3://{s3_bucket_name}/{prefix}/output"

xgb_estimator = sagemaker.estimator.Estimator(
    container,
    role,
    base_job_name = "feature-store-xgb",
    hyperparameters = hyperparameters,
    instance_type = "ml.m5.2xlarge",
    instance_count = 1,
    volume_size = 5,
    output_path = output_path,
    sagemaker_session = sagemaker.Session(),
)

In [None]:
train_input = sagemaker.inputs.TrainingInput(f"s3://{s3_bucket_name}/{prefix}/train/", content_type="csv")
validation_input = sagemaker.inputs.TrainingInput(f"s3://{s3_bucket_name}/{prefix}/validation/", content_type="csv")

xgb_estimator.fit({'train':train_input, 'validation':validation_input})

## Batch Transform

In [None]:
y_test = test_data.iloc[:,0]
y_test = np.array(y_test.to_numpy()).flatten().astype(float)

In [None]:
test_data_nolabel = test_data.iloc[:, 3:]
test_data_nolabel = pd.concat([test_data[1],test_data_nolabel], axis=1)
test_data_nolabel.head()

In [None]:
test_data_nolabel.to_csv('test_data_nolabel.csv', header=None, index=False)
sagemaker.Session().upload_data('test_data_nolabel.csv', bucket=s3_bucket_name, key_prefix=prefix+'/test')

In [None]:
s3_batch_input = f's3://{s3_bucket_name}/{prefix}/test/test_data_nolabel.csv'
s3_batch_output = f's3://{s3_bucket_name}/{prefix}/batch_output'

In [None]:
xgb_transformer = xgb_estimator.transformer(
    instance_count = 1, 
    instance_type = 'ml.m5.xlarge',
    output_path=s3_batch_output,
    assemble_with="Line",
    accept = 'text/csv'
)

In [None]:
xgb_transformer.transform(data=s3_batch_input, data_type='S3Prefix', content_type='text/csv', split_type='Line', input_filter="$[1:]", join_source="Input", output_filter="$[0,-1]")

xgb_transformer.wait()

In [None]:
batch_output_location = f's3://{s3_bucket_name}/{prefix}/batch_output/test_data_nolabel.csv.out'
print(f'Batch transform output location: {batch_output_location}')

In [None]:
batch_output = pd.read_csv(batch_output_location, header=None, encoding = "ISO-8859-1")
batch_output.head()

In [None]:
batch_output = np.array(batch_output.iloc[:,1].to_numpy()).flatten().astype(float)
acc = accuracy_score(y_test, batch_output)
print(f"Accuracy on test data is {acc}")

## Do HPO in training if needed

In [None]:
from sagemaker.tuner import (
    IntegerParameter,
    CategoricalParameter,
    ContinuousParameter,
    HyperparameterTuner,
)

hyperparameter_ranges = {
    "min_child_weight": ContinuousParameter(1, 5),
    "max_depth": IntegerParameter(1, 10),
    "num_round":IntegerParameter(1, 300),
    "alpha": ContinuousParameter(0, 0.3),
}

objective_metric_name = "validation:accuracy"

tuner = HyperparameterTuner(xgb_estimator, objective_metric_name, hyperparameter_ranges, max_jobs=200, max_parallel_jobs=10)

tuner.fit({'train':train_input, 'validation':validation_input})

In [None]:
tuner.best_training_job()

In [None]:
tuner.best_estimator().hyperparameters()