# Amazon SageMaker Feature Store Update Feature Group

This notebook demonstrates how a Feature Group in Amazon SageMaker Feature Store can be updated to add a new feature using the new UpdateFeatureGroup API.

In [None]:
from sagemaker.feature_store.feature_group import FeatureGroup
from time import gmtime, strftime, sleep
from random import randint
import boto3
import sagemaker
import pandas as pd
import numpy as np
import logging
import random
import time
import subprocess
import sys
import importlib

In [None]:
logger = logging.getLogger('__name__')
logger.setLevel(logging.DEBUG)
logger.addHandler(logging.StreamHandler())

We ensure latest versions of the libraries are used.

In [None]:
if sagemaker.__version__ < '2.48.1':
 subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker==2.48.1'])
 importlib.reload(sagemaker)

In [None]:
if boto3.__version__ < '1.24.23':
 subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'boto3==1.24.23'])
 importlib.reload(boto3)

In [None]:
logger.info(f'Using SageMaker version: {sagemaker.__version__}')
logger.info(f'Using Pandas version: {pd.__version__}')
logger.info(f'Using boto3 version: {boto3.__version__}')

In [None]:
import pprint
pretty_printer = pprint.PrettyPrinter(indent=4)

In [None]:
sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
default_bucket = sagemaker_session.default_bucket()
logger.info(f'Default S3 bucket = {default_bucket}')
prefix = 'sagemaker-feature-store'
region = sagemaker_session.boto_region_name

In [None]:
boto_session = boto3.Session(region_name=region)
sagemaker_runtime = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)
s3 = boto_session.resource('s3')

Read the customers csv data into a dataframe

In [None]:
customers_df = pd.read_csv('data/customers.csv')
customers_df.head(5)

In [None]:
customers_df.dtypes

In [None]:
current_time_sec = int(round(time.time()))

Add event_time timestamp. This is a point in time when a new event occurs that corresponds to the creation or update of a record in a feature group. All records in the feature group must have a corresponding event_time

In [None]:
customers_df['event_time'] = pd.Series([current_time_sec] * len(customers_df), dtype="float64")
customers_df.head(5)

In [None]:
customers_df['customer_id'] = customers_df['customer_id'].astype('string')

In [None]:
current_timestamp = strftime('%m-%d-%H-%M', gmtime())
customers_feature_group_name = f'fs-customers-{current_timestamp}'
logger.info(f'Feature group name = {customers_feature_group_name}')

In [None]:
customers_feature_group = FeatureGroup(name=customers_feature_group_name, 
 sagemaker_session=sagemaker_session)
customers_feature_group.load_feature_definitions(data_frame=customers_df)

In [None]:
def wait_for_feature_group_creation_complete(feature_group):
 status = feature_group.describe().get('FeatureGroupStatus')
 print(f'Initial status: {status}')
 while status == 'Creating':
 logger.info(f'Waiting for feature group: {feature_group.name} to be created ...')
 time.sleep(5)
 status = feature_group.describe().get('FeatureGroupStatus')
 if status != 'Created':
 raise SystemExit(f'Failed to create feature group {feature_group.name}: {status}')
 logger.info(f'FeatureGroup {feature_group.name} was successfully created.')

Create customer feature group

In [None]:
customers_feature_group.create(s3_uri=f's3://{default_bucket}/{prefix}', 
 record_identifier_name='customer_id', 
 event_time_feature_name='event_time', 
 role_arn=role, 
 enable_online_store=True)

In [None]:
wait_for_feature_group_creation_complete(customers_feature_group)

In [None]:
describe_feature_group_result = sagemaker_runtime.describe_feature_group(
 FeatureGroupName=customers_feature_group_name
)
pretty_printer.pprint(describe_feature_group_result)

Ingest the intitial data frame into the Feature Group. The ingest may take some time since data is buffered, batched, and written into offline store i.e. Amazon S3 within 15 minutes.

In [None]:
%%time
logger.info(f'Ingesting data into feature group: {customers_feature_group.name} ')
customers_feature_group.ingest(data_frame=customers_df, 
 max_workers=3, 
 wait=True)

In [None]:
customer_id = f'C{randint(1, 500)}'
logger.info(f'customer_id={customer_id}') 

Verify a record in the online feature store

In [None]:
feature_record = featurestore_runtime.get_record(FeatureGroupName=customers_feature_group_name, 
 RecordIdentifierValueAsString=customer_id)
feature_record

Let us run some Athena queries to verify the offline feature store data.

In [None]:
customers_query = customers_feature_group.athena_query()
customers_table = customers_query.table_name

In [None]:
output_location = f's3://{default_bucket}/{prefix}/query_results/'

In [None]:
query_string = f'SELECT * FROM "{customers_table}" limit 10'

Now we run the query to load all of the data into dataframe and explore 

In [None]:
customers_query.run(query_string=query_string,output_location=output_location)
customers_query.wait()
athena_df = customers_query.as_dataframe()
athena_df.head()

#### Query in Athena console

If it is for the first time we are launching Athena in AWS console we need to click on `Get Started` button and then before we run the first query we need to set up a query results location in Amazon S3. 

After setting the query results location, on the left panel we need to select the `AwsDataCatalog` as Data source and the `sagemaker_featurestore` as Database.

We can run now run a query for the offline feature store data in Athena. To select the entries from the customers feature group we use the following SQL query. You will need to replace the customers table name with the corresponding value from the one created here in the notebook.

```sql
select * from ""
limit 10
```

![Athena Query](./images/athena-query.png "Athena Query")

#### Do not proceed till all of the 500 records are ingested into the feature store. As mentioned before the ingest may take some time since data is buffered, batched, and written into offline store i.e. Amazon S3 within 15 minutes.

The sample product set that we have are spread out across different categories - baby products, candies, cleaning products etc. So let us assume that a customer *“having kids or not”* is defintely an indicator of them buying baby and kids products. Lets go ahead and modify the customer feature group to add this new feature. 

In [None]:
sagemaker_runtime.update_feature_group(
 FeatureGroupName=customers_feature_group_name,
 FeatureAdditions=[
 {"FeatureName": "has_kids", "FeatureType": "Integral"}
 ])

When update_feature_group API is invoked, the control plane will reflect the schema change instantaneously but the data plane will take at the most 5 minutes to update its feature group schema. We must ensure that enough time is given for the update operation before proceeding to data ingestion.

In [None]:
time.sleep(60)

In [None]:
describe_feature_group_result = sagemaker_runtime.describe_feature_group(
 FeatureGroupName=customers_feature_group_name
)
pretty_printer.pprint(describe_feature_group_result)

In [None]:
customers_query.run(query_string=query_string,output_location=output_location)
customers_query.wait()
athena_df_update = customers_query.as_dataframe()
athena_df_update.head()

In [None]:
customers_df.drop(['event_time'],axis=1)

We randomly generate 0 or 1 for "has_kids" feature and ingest into feature group

In [None]:
customers_df['has_kids'] =np.random.randint(0, 2, customers_df.shape[0])
customers_df.dtypes

In [None]:
customers_df['event_time'] = pd.Series([current_time_sec] * len(customers_df), dtype="float64")
customers_df.head(10)

Ingest the updated data into feature group. In case ingest operation throws errors regarding feature not being present in the Feature Group, wait for the update operation executed previosuly to finish and run the ingest again.

In [None]:
%%time
logger.info(f'Ingesting data into feature group: {customers_feature_group.name} ...')
customers_feature_group.ingest(data_frame=customers_df, max_workers=3, wait=True)
logger.info(f'{len(customers_df)} customer records ingested into feature group: {customers_feature_group.name}')

In [None]:
get_record_result = featurestore_runtime.get_record(
 FeatureGroupName=customers_feature_group_name,
 RecordIdentifierValueAsString=customer_id
)
pretty_printer.pprint(get_record_result)

Let us re run the Athena query to verify data for this new feature. The ingest will take time since data is buffered, batched, and written into offline store i.e. Amazon S3 within 15 minutes.

In [None]:
customers_query.run(query_string=query_string,output_location=output_location)
customers_query.wait()
athena_df_update = customers_query.as_dataframe()
athena_df_update.head()

Verify via Athena console that data has been added. You will see two rows for each customer record, one that has no value for "has_kids" and one that has a value.

![Athena Query Final](./images/athena-query-final.png "Athena Query Final")

### Cleanup

Now that we have seen how features can be added to feature groups, it is time to delete unwated resources to not inciur charges

Delete the S3 artifacts for the offline store

In [None]:
describe_feature_group_result = sagemaker_runtime.describe_feature_group(
 FeatureGroupName=customers_feature_group_name
)
pretty_printer.pprint(describe_feature_group_result)

In [None]:
s3_config = describe_feature_group_result['OfflineStoreConfig']['S3StorageConfig']
s3_uri = s3_config['ResolvedOutputS3Uri']
full_prefix = '/'.join(s3_uri.split('/')[3:])
logger.info(full_prefix)

In [None]:
bucket = s3.Bucket(default_bucket)
offline_objects = bucket.objects.filter(Prefix=full_prefix)
offline_objects.delete()

Delete the feature group

In [None]:
customers_feature_group.delete()