Copyright 2021 Amazon.com and its affiliates; all rights reserved. This file is AWS Content and may not be duplicated or distributed without permission

# Using Python feature pipelines with Amazon SageMaker Feature Store
This notebook provides a demo of setting up a scheduled feature pipeline for 
transformation of raw data and ingestion into SageMaker Feature Store. Customers have
many ways to get this done, and this example takes the following approach:

- Uses a single Python function provided by the data scientist for feature transformation
- Uses Amazon Event Bridge for scheduling
- Uses Amazon SageMaker Pipelines for execution of the feature pipeline
- Uses an Amazon SageMaker Processing job to do the core feature transformation and ingestion work within the pipeline

The notebook assumes that the feature group already exists.

### A few imports

In [None]:
from utilities.feature_store_helper import FeatureStore

from IPython.core.display import display, HTML, Markdown
import pandas as pd
import time
from sklearn.ensemble import RandomForestClassifier

FG_NAME = 'fs-demo-2022-03-24'

fs = FeatureStore()

### Read some raw customer data

In [None]:
df = pd.read_csv('utilities/customers.csv')
ORIGINAL_RECORD_COUNT = df.shape[0]
df.head()

## Create a scheduled feature pipeline

#### First define any transformations needed from raw data to final features
For this first example, we have no feature transformations. We are simply ingesting the latest fully featurized data on a scheduled basis to be available in both our online and online store.

#### Ensure the raw data is available as input for the new pipeline

In [None]:
import sagemaker
default_bucket = sagemaker.Session().default_bucket()
data_source = f's3://{default_bucket}/sagemaker-feature-store/hello-data/'

!aws s3 cp utilities/customers.csv $data_source

#### Now, schedule the new feature pipeline to run daily, with the first execution starting a few seconds from now

In [None]:
from datetime import datetime
pipeline_start_time = str(datetime.now())

In [None]:
fs.schedule_feature_pipeline(data_source, FG_NAME)

In [None]:
import boto3
import json

sm = boto3.client('sagemaker')
no_executions = True
while no_executions:
 summs = sm.list_pipeline_executions(PipelineName=f'sm-pipeline-{FG_NAME}')['PipelineExecutionSummaries']
 if len(summs) > 0:
 print(json.dumps(summs, indent=4, default=str))
 break
 time.sleep(15)

#### Once the pipeline execution has completed, should see one more version of the features in history
**NOTE:** you won't see the new updates in the offline store until they are replicated. Takes a few minutes.

In [None]:
while True:
 hist_df = fs.get_historical_offline_feature_values(FG_NAME, record_ids=[2])
 rec_count = hist_df.shape[0]
 if rec_count > 0:
 sorted_df = hist_df.sort_values(by=['write_time'], ascending=[False])
 latest_write = sorted_df.iloc[0]['write_time']
 if latest_write > pipeline_start_time:
 break
 else:
 time.sleep(60)
 else:
 time.sleep(60)
hist_df

And we can see that the online store still has the value with the newest event time (updatetime in our feature group)

In [None]:
fs.get_latest_feature_values(FG_NAME, [2])

## Make a new version of the feature group, adding new features

#### First we'll disable the pipeline for the original feature group. In practice, you may keep it around for some period of time and then deprecate it.

In [None]:
fs.disable_feature_pipeline(FG_NAME)

#### We'll auto-define the new schema, adding two features to the original dataframe
We update the Pandas dataframe, indicating both the **name** and the **type** of each new feature.

In [None]:
df['Persona'] = int(0)
df['NewFeature1'] = float(0.0)

In [None]:
df

#### Now we create a new feature group with a new version, based on the updated schema

In [None]:
new_fg_name = FG_NAME + '-v2'
fs.create_fg_from_df(new_fg_name, df)

In [None]:
fs.describe_feature_group(new_fg_name)['FeatureDefinitions']

#### Use a new Python transform function that creates the two new features based on the raw data

In [None]:
%%writefile utilities/customer_v2.py

import pandas as pd
import numpy as np

def choose_persona(row):
 if row['Id'] > 3:
 return 0
 else:
 return 1

def apply_transforms(df: pd.DataFrame) -> pd.DataFrame:
 df['Persona'] = df.apply(lambda row : choose_persona(row), axis=1) 
 df['NewFeature1'] = df['Persona'] * np.random.rand() + (2* df['ZipCode'])
 return df


#### Locally test out the new transforms

In [None]:
import utilities.customer_v2
df = utilities.customer_v2.apply_transforms(df)
df

#### Schedule the new pipeline hourly

In [None]:
fs.schedule_feature_pipeline(data_source, new_fg_name, 
 'utilities/customer_v2.py', schedule='rate(1 hour)')

#### Show that the new features are available in the online store following the pipeline execution
The online store feature values will be available once the pipeline is completed.

In [None]:
feature_values = []
iterations = 0
while True:
 feature_values = fs.get_latest_feature_values(new_fg_name, [2])
 if len(feature_values):
 break
 else:
 if iterations == 0:
 print('Waiting for record to be available in online store..')
 iterations += 1
 time.sleep(60)
feature_values

In [None]:
fs.disable_feature_pipeline(new_fg_name)

## Update the pipeline to use a new implementation of the feature transformation script, to improve the features or fix bugs

In [None]:
%%writefile utilities/customer_v1_bugfix.py

import pandas as pd
import numpy as np

def choose_persona(row):
 if row['Id'] > 3:
 return 0
 else:
 return 1

def apply_transforms(df: pd.DataFrame) -> pd.DataFrame:
 df['Persona'] = df.apply(lambda row : choose_persona(row), axis=1) 
 df['NewFeature1'] = df['Persona'] * np.random.rand() + df['ZipCode']
 return df


#### Try out the updated transforms locally

In [None]:
import utilities.customer_v1_bugfix
df = utilities.customer_v1_bugfix.apply_transforms(df)
df

#### Now, update the pipeline to have the new transforms

In [None]:
fs.update_feature_pipeline(data_source, new_fg_name, 
 'utilities/customer_v1_bugfix.py', instance_count=1)

#### Perform a full backfill from raw data through the updated transforms
This demonstrates that the bug fix worked, and that any subsequent use of this feature group for training or for inference will get the fresh and correct feature values, using a consistent feature implementation.

Once the pipeline has completed executing, you should see that records returned from the online store have values in `NewFeature1` that no longer look like a multiple of the `ZipCode`.

In [None]:
feature_values = []
iterations = 0
while True:
 feature_values = fs.get_latest_feature_values(new_fg_name, [2])
 if len(feature_values):
 if (feature_values[0]['NewFeature1'] <= 22222.0):
 break
 else:
 if iterations == 0:
 iterations += 1
 print('Waiting for pipeline to complete...')
 time.sleep(60)
 else:
 if iterations == 0:
 iterations += 1
 print('Waiting for pipeline to complete...')
 time.sleep(60)
feature_values

#### Show that we have multiple versions of the feature records in the offline store, including the original values and now the fixed values
Be sure to give it a few minutes for the offline store to be updated. Once you see 2 different records being returned for this record ID, you know it is complete. You'll see that there's an old version with the original feature value, and new version, with a more recent `write_time` with the corrected value.


In [None]:
while True:
 hist_df = fs.get_historical_offline_feature_values(new_fg_name, record_ids=[2])
 rec_count = hist_df.shape[0]
 if rec_count > 1:
 break
 else:
 time.sleep(60)
hist_df

#### Show that the latest offline store feature values have the corrected values

In [None]:
fs.get_latest_offline_feature_values(new_fg_name, record_ids=[2])

## Clean up
Remove the pipeline for the first feature group, and the one for the new feature group.

In [None]:
fs.remove_feature_pipeline(FG_NAME)

In [None]:
fs.remove_feature_pipeline(new_fg_name)

Delete the feature groups

In [None]:
# fs.delete_feature_group(FG_NAME)

In [None]:
# fs.delete_feature_group(new_fg_name)