### Please make sure you have selected SparkAnalytics 2.0 as Image and Glue Python (PySpark & Ray) as Kernel.

To change a notebook's image or kernel
 - Choose the image/kernel name in the notebook menu.
 - From the Set up notebook environment pop up window, select the Image or Kernel dropdown menu.
 - From the dropdown menu, choose "SparkAnalytics 2.0" as image, "Glue Python [PySpark and Ray]" as kernel
 - After choosing ml.t3.medium as Instance type, choose Select.
 - Wait for the kernel's status to show as idle, which indicates the kernel has started. 


### Ingest Data with Glue Interactive Session

This notebook demonstrates how to read the redshift table data from the Glue Interactive Session.

_The Amazon SageMaker Feature Store Spark Connector requires specific Spark connector JARs during the initialization of the session. You can upload them to your Amazon S3 bucket. After you’ve uploaded them, you must provide the GIS sessions with the JARs using the following command._

`
%extra_jars '<S3 URI>'
`

_To install the Spark Connector in the AWS Glue runtime, use the %additional_python_modules magic command within the GIS notebook. AWS Glue runs pip to the modules that you’ve specified under %additional_python_modules._

`
%additional_python_modules '<module1>,<module2>'
`

*** Before you start the AWS Glue session, you must use both of the preceding magic commands.***


#### Replace '< region-code> and < acct-id>' with your current region code and account ID. 
    You can also copy and replace %extra_jars line below with the Jar Location from 1-uploadJar.ipynb

In [None]:
%connections 'RS-SM-Demo-glue-Connection'
%session_id_prefix 'rs-glueinteractivesession'
%worker_type 'G.2X'
%idle_timeout 720
%additional_python_modules 'sagemaker,sagemaker-feature-store-pyspark'
%extra_jars 's3://sagemaker-<region-code>-<acct-id>/sagemaker-feature-store-spark-jar/sagemaker-feature-store-spark-sdk.jar'
%%configure
{
    "--enable-auto-scaling": "true",
    "--enable-metrics": "true",
    "--enable-continuous-cloudwatch-log": "true",
    "--enable-continuous-log-filter": "true"
}

In [None]:
# Source configuration 
redshift_secret = 'SecretForRedshiftSageMakerDemo2023'
redshift_schema ='sagemakerdemo'

# Target configuration - Feature store
feature_group_prefix = 'redshift-sm-demo-4a-'

### Import Dependencies

In [None]:
from awsglue.transforms import *
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from pyspark.sql import SQLContext
from pyspark.sql.functions import *
import pyspark.sql.functions as fn
import boto3
import json
import time
import sagemaker
from sagemaker.feature_store.feature_group import FeatureDefinition, FeatureGroup, FeatureTypeEnum, DataCatalogConfig
from pyspark.sql import SparkSession
from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
import feature_store_pyspark
sagemaker_session = sagemaker.Session()
spark = SparkSession.builder.getOrCreate()
feature_store_manager= FeatureStoreManager()
s3_bucket = sagemaker_session.default_bucket()
rs_tmp_dir = f"s3://{s3_bucket}/redshift_tmp/"

region = boto3.Session().region_name
s3_client = boto3.client('s3', region_name=region)


#### Retrieve redshift connection details from Secrets Manager

In [None]:
glueContext = GlueContext(SparkContext.getOrCreate())
client = boto3.client('secretsmanager')
response = client.get_secret_value(SecretId=redshift_secret)
database_secrets = json.loads(response['SecretString'])
username = database_secrets['username']
password = database_secrets['password']
host = database_secrets['host']
port = database_secrets['port']
engine = database_secrets['engine']


In [None]:
def create_connection(tablename):
    rs_conn = {  
    "url": "jdbc:" + engine + "://" + host + ":" + str(port) + "/dev",
    "dbtable": redshift_schema + "." + tablename ,
    "user": username,
    "password": password,
    "redshiftTmpDir": rs_tmp_dir
    }
    return rs_conn

In [None]:
users = glueContext.create_data_frame_from_options("redshift",create_connection("dim_user"))
places = glueContext.create_data_frame_from_options("redshift",create_connection("dim_place"))
ratings = glueContext.create_data_frame_from_options("redshift",create_connection("fact_rating"))

In [None]:
users_cnt = users.count()
places_cnt = places.count()
ratings_cnt = ratings.count()
print(f"users_cnt :{users_cnt} \nplaces_cnt : {places_cnt} \nratings_cnt : {ratings_cnt}")

### Feature engineering
This part is just doing some simple feature transformations only for reference

### 1.User Data

#### 1-1 Refine TimeStamp Format

In [None]:
users_data = users.withColumn("timestamp", date_format(col("timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) 
users_data.show(1)

#### 1-2 Drop Columns

In [None]:
drop_cols = ['user_activity','user_ambience','user_birth_year','user_cuisine','user_color','user_dress_preference','user_hijos','user_dress_preference','user_marital_status','user_payment','user_weight','user_religion','user_height']
users_data = users_data.drop(*drop_cols)
users_data.show(1)

#### 1-3 input missing values

In [None]:
# fill with missing values
users_data = users_data.fillna({"user_smoker" : False, "user_transport" : "PUBLIC" , "user_budget" : "MEDIUM"})
users_data.show(1)

#### 1-4 Ordinal Encoding

In [None]:
user_drink_level_dict = {'ABSTEMIOUS': 0, 'CASUAL DRINKER': 1, 'SOCIAL DRINKER': 2}
user_budget_dict = {'LOW': 0, 'MEDIUM': 1, 'HIGH': 2}
user_transport_dict = {'ON FOOT': 0, 'PUBLIC': 1, 'CAR OWNER': 2}

# user drink
keys = array(list(map(lit, user_drink_level_dict.keys())))
values = array(list(map(lit, user_drink_level_dict.values())))
_map = map_from_arrays(keys, values)
users_data = users_data.withColumn("user_drink_level2", _map.getItem(col("user_drink_level"))).drop("user_drink_level").withColumnRenamed("user_drink_level2", "user_drink_level")
# user budget
keys = array(list(map(lit, user_budget_dict.keys())))
values = array(list(map(lit, user_budget_dict.values())))
_map = map_from_arrays(keys, values)
users_data = users_data.withColumn("user_budget2", _map.getItem(col("user_budget"))).drop("user_budget").withColumnRenamed("user_budget2", "user_budget")
# user transport
keys = array(list(map(lit, user_transport_dict.keys())))
values = array(list(map(lit, user_transport_dict.values())))
_map = map_from_arrays(keys, values)
users_data = users_data.withColumn("user_transport2", _map.getItem(col("user_transport"))).drop("user_transport").withColumnRenamed("user_transport2", "user_transport")
users_data.show(1)

#### 1-5 One-hot Encoding

In [None]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol='user_interest', outputCol='user_interest_numeric')
indexer_fitted = indexer.fit(users_data)
users_data = indexer_fitted.transform(users_data)

indexer2 = StringIndexer(inputCol='user_personality', outputCol='user_personality_numeric')
indexer_fitted2 = indexer2.fit(users_data)
users_data = indexer_fitted2.transform(users_data)

from pyspark.ml.feature import OneHotEncoder
encoder = OneHotEncoder(inputCols=['user_interest_numeric','user_personality_numeric'], outputCols=['user_interest_onehot','user_personality_onehot'], dropLast=False)
df_onehot = encoder.fit(users_data).transform(users_data)
df_onehot.show(1)

In [None]:
# For user interest feature
from pyspark.ml.functions import vector_to_array
import pyspark.sql.functions as F

df_user_interest_onehot = df_onehot.select('*', vector_to_array('user_interest_onehot').alias('_interest_onehot'))
num_categories = len(df_user_interest_onehot.first()['_interest_onehot'])
cols_expanded = [(F.col('_interest_onehot')[i].alias(f'{indexer_fitted.labels[i]}')) for i in range(num_categories)]
df_user_interest_onehot = df_user_interest_onehot.select('userid',*cols_expanded)
df_user_interest_onehot = df_user_interest_onehot.select('userid',col("TECHNOLOGY").alias("user_interest_" + "TECHNOLOGY"), col("ECO-FRIENDLY").alias("user_interest_" + "ECO-FRIENDLY"), col("NONE").alias("user_interest_" + "NONE"), col("VARIETY").alias("user_interest_" + "VARIETY"), col("RETRO").alias("user_interest_" + "RETRO"))
df_user_interest_onehot.show(1)

In [None]:
# For user personality feature
from pyspark.ml.functions import vector_to_array
import pyspark.sql.functions as F

df_user_personality_onehot = df_onehot.select('*', vector_to_array('user_personality_onehot').alias('_personality_onehot'))
num_categories = len(df_user_personality_onehot.first()['_personality_onehot'])
cols_expanded = [(F.col('_personality_onehot')[i].alias(f'{indexer_fitted2.labels[i]}')) for i in range(num_categories)]
df_user_personality_onehot = df_user_personality_onehot.select('userid',*cols_expanded)
df_user_personality_onehot = df_user_personality_onehot.select('userid',col("CONFORMIST").alias("user_personality_" + "CONFORMIST"), col("THRIFTY-PROTECTOR").alias("user_personality_" + "THRIFTY-PROTECTOR"), col("HUNTER-OSTENTATIOUS").alias("user_personality_" + "HUNTER-OSTENTATIOUS"), col("HARD-WORKER").alias("user_personality_" + "HARD-WORKER"))
df_user_personality_onehot.show(1)

In [None]:
# Join together and drop original features
_col = ["user_interest" , "user_personality" , "user_interest_numeric" ,"user_personality_numeric"]
users_data = users_data.drop(*_col).join(df_user_interest_onehot,['userid'],how='inner').join(df_user_personality_onehot,['userid'],how='inner')
users_data.show(1)

#### 1-6 Reorder columns

In [None]:
_column_names = ['userid', 'user_drink_level', 'user_smoker', 'user_budget', 'user_latitude', 'user_longitude','user_transport','user_interest_VARIETY','user_interest_ECO-FRIENDLY',  'user_interest_RETRO', 'user_interest_TECHNOLOGY', 'user_interest_NONE', 'user_personality_CONFORMIST', 'user_personality_THRIFTY-PROTECTOR', 'user_personality_HUNTER-OSTENTATIOUS', 'user_personality_HARD-WORKER', 'timestamp']
users_data = users_data.select(_column_names)
users_data.show(1)

### 2.Place Data

#### 2-1 Refine timestamp format

In [None]:
places_data = places.withColumn("timestamp", date_format(col("timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) 
places_data.show(1)

#### 2-2 Drop columns

In [None]:
drop_cols = ['place_address','place_accessibility','place_ambience','place_area','place_city','place_dress_code','place_cuisine','place_franchise','place_name','place_other_services','place_payment','place_state','place_zip','place_country']
places_data = places_data.drop(*drop_cols)
places_data.show(1)

#### 2-3 Ordinal Encoding

In [None]:
place_alcohol_dict = {'NO_ALCOHOL_SERVED': 0, 'WINE-BEER': 1, 'FULL_BAR': 2}
place_smoking_area_dict = {'NONE': 0, 'NOT PERMITTED': 0, 'ONLY AT BAR': 1, 'SECTION': 2, 'PERMITTED': 3}
place_price_dict = {'LOW': 0, 'MEDIUM': 1, 'HIGH': 2}
place_parking_lot_dict = {'[NONE]': 0, '[PUBLIC]': 1, '[VALET PARKING]': 2, '[YES]': 3}

# place_alcohol
keys = array(list(map(lit, place_alcohol_dict.keys())))
values = array(list(map(lit, place_alcohol_dict.values())))
_map = map_from_arrays(keys, values)
places_data = places_data.withColumn("place_alcohol2", _map.getItem(col("place_alcohol"))).drop("place_alcohol").withColumnRenamed("place_alcohol2", "place_alcohol")
# place_smoking_area
keys = array(list(map(lit, place_smoking_area_dict.keys())))
values = array(list(map(lit, place_smoking_area_dict.values())))
_map = map_from_arrays(keys, values)
places_data = places_data.withColumn("place_smoking_area2", _map.getItem(col("place_smoking_area"))).drop("place_smoking_area").withColumnRenamed("place_smoking_area2", "place_smoking_area")
# place_price
keys = array(list(map(lit, place_price_dict.keys())))
values = array(list(map(lit, place_price_dict.values())))
_map = map_from_arrays(keys, values)
places_data = places_data.withColumn("place_price2", _map.getItem(col("place_price"))).drop("place_price").withColumnRenamed("place_price2", "place_price")
# place_parking_lot
keys = array(list(map(lit, place_parking_lot_dict.keys())))
values = array(list(map(lit, place_parking_lot_dict.values())))
_map = map_from_arrays(keys, values)
places_data = places_data.withColumn("place_parking_lot2", _map.getItem(col("place_parking_lot"))).drop("place_parking_lot").withColumnRenamed("place_parking_lot2", "place_parking_lot")
places_data.show(1)

#### 2-4 Reorder columns

In [None]:
_column_names = ['placeid', 'place_latitude', 'place_longitude', 'place_smoking_area', 'place_alcohol', 'place_price','place_parking_lot','timestamp']
places_data = places_data.select(_column_names)
places_data.show(1)

### 3.Rating Data

#### 3-1 Refine timestamp format

In [None]:
ratings_data = ratings.withColumn("timestamp", date_format(col("timestamp"),"yyyy-MM-dd'T'HH:mm:ss.SSS'Z'")) 
ratings_data.show(1)

#### 3-2 Drop columns

In [None]:
drop_cols = ['rating_food','rating_service']
ratings_data = ratings_data.drop(*drop_cols)
ratings_data.show(1)

#### 3-3 Reorder columns

In [None]:
_column_names = ['userid', 'ratingid', 'placeid', 'rating_overall','timestamp']
ratings_data = ratings_data.select(_column_names)
ratings_data.show(1)

### Ingest redshift data into feature store

In [None]:
def ingest_data_to_feature_store(dataframe, feature_group_name, sagemaker_session):
    feature_group_arn = sagemaker_session.describe_feature_group(feature_group_name)['FeatureGroupArn']
    feature_store_manager.ingest_data(input_data_frame=dataframe, feature_group_arn= feature_group_arn)
    print(f'Process - ingest_to_feature_store - {feature_group_name} : Completed')

In [None]:
users_feature_group_name = feature_group_prefix + 'users'
places_feature_group_name = feature_group_prefix + 'places'
ratings_feature_group_name = feature_group_prefix + 'ratings'

In [None]:
ingest_data_to_feature_store(users_data,users_feature_group_name,sagemaker_session)
ingest_data_to_feature_store(places_data,places_feature_group_name,sagemaker_session)
ingest_data_to_feature_store(ratings_data,ratings_feature_group_name,sagemaker_session)

In [None]:
def wait_for_feature_group_data_ingest(s3_bucket, s3_prefix):
    print(f"Polling S3 location for data: {s3_prefix}")
    while True:
        objects_in_bucket = s3_client.list_objects(Bucket=s3_bucket, Prefix=s3_prefix)
        if "Contents" in objects_in_bucket and len(objects_in_bucket["Contents"]) > 1:
            break
        else:
            print("Waiting for data in offline store...")
            time.sleep(60)
    print(f"Data available. - {s3_prefix}")
    

def get_table_name(feature_group_name):
    featurestore_table = sagemaker_session.describe_feature_group(feature_group_name)['OfflineStoreConfig']['DataCatalogConfig']['TableName']
    return featurestore_table

In [None]:
wait_for_feature_group_data_ingest(s3_bucket, users_feature_group_name)
wait_for_feature_group_data_ingest(s3_bucket, places_feature_group_name)
wait_for_feature_group_data_ingest(s3_bucket, ratings_feature_group_name)

In [None]:
users_table = get_table_name(users_feature_group_name)
places_table = get_table_name(places_feature_group_name)
ratings_table = get_table_name(ratings_feature_group_name)
print(f"users_table : {users_table} \nplaces_table : {places_table} \nratings_table : {ratings_table} ")

#### Read feature store data

In [None]:
users_table_df = glueContext.create_data_frame.from_catalog(database="sagemaker_featurestore", table_name=users_table)
places_table_df = glueContext.create_data_frame.from_catalog(database="sagemaker_featurestore", table_name=places_table)
ratings_table_df = glueContext.create_data_frame.from_catalog(database="sagemaker_featurestore", table_name=ratings_table)

In [None]:
users_table_df.show()

In [None]:
places_table_df.show()

In [None]:
ratings_table_df.show()

After you’ve completed your work, you can end your AWS Glue interactive session immediately by simply shutting down the Studio notebook kernel, or you could use the %stop_session magic.

In [None]:
%stop_session