Copyright 2021 Amazon.com and its affiliates; all rights reserved. This file is AWS Content and may not be duplicated or distributed without permission

# Using SQL feature pipelines with Amazon SageMaker Feature Store
This notebook provides a demo of setting up a SQL-based scheduled feature pipeline for 
transformation of raw data and ingestion into SageMaker Feature Store. Customers have
many ways to get this done, and this example takes the following approach:

- Uses a single SQL function provided by the data scientist for feature transformation
- Uses Amazon Event Bridge for scheduling
- Uses Amazon SageMaker Pipelines for execution of the feature pipeline
- Uses an Amazon SageMaker Processing job to do the core feature transformation and ingestion work within the pipeline

The notebook assumes that the feature group already exists.

### A few imports

In [None]:
from utilities.feature_store_helper import FeatureStore

import boto3
import json

from IPython.core.display import display, HTML, Markdown
import time
import pandas as pd


FG_NAME = 'fs-demo-2022-03-24-sql'

fs = FeatureStore()

### Read some raw customer data

In [None]:
df = pd.read_csv('utilities/customers.csv')
ORIGINAL_RECORD_COUNT = df.shape[0]
df.head()

In [None]:
fs.create_fg_from_df(FG_NAME, df, id_name='Id', event_time_name='UpdateTime')

## Create a scheduled SQL-based feature pipeline

#### First define any transformations needed from raw data to final features
For this first example, we have no feature transformations. We are simply ingesting the latest fully featurized data on a scheduled basis to be available in both our online and online store.

#### Ensure the raw data is available as input for the new pipeline

In [None]:
import sagemaker
default_bucket = sagemaker.Session().default_bucket()
data_source = f's3://{default_bucket}/sagemaker-feature-store/hello-data/'

!aws s3 cp utilities/customers.csv $data_source

In [None]:
!head utilities/customers.csv

#### Now, schedule the new feature pipeline to run daily, with the first execution starting a few seconds from now

In [None]:
from datetime import datetime
pipeline_start_time = str(datetime.now())

In [None]:
fs.schedule_feature_pipeline(data_source, FG_NAME, script_type='pyspark_sql')

In [None]:
sm = boto3.client('sagemaker')
no_executions = True
while no_executions:
 summs = sm.list_pipeline_executions(PipelineName=f'sm-pipeline-{FG_NAME}')['PipelineExecutionSummaries']
 if len(summs) > 0:
 print(json.dumps(summs, indent=4, default=str))
 break
 time.sleep(15)

#### Once the pipeline execution has completed, should see one more version of the features in history
**NOTE:** you won't see the new updates in the offline store until they are replicated. Takes a few minutes.

In [None]:
while True:
 hist_df = fs.get_historical_offline_feature_values(FG_NAME, record_ids=[2])
 rec_count = hist_df.shape[0]
 if rec_count > 0:
 sorted_df = hist_df.sort_values(by=['write_time'], ascending=[False])
 latest_write = sorted_df.iloc[0]['write_time']
 if latest_write > pipeline_start_time:
 break
 else:
 time.sleep(60)
 else:
 time.sleep(60)
hist_df

And we can see that the online store still has the value with the newest event time (updatetime in our feature group)

In [None]:
fs.get_latest_feature_values(FG_NAME, [2])

## Make a new version of the feature group, adding new features

#### First we'll disable the pipeline for the original feature group. In practice, you may keep it around for some period of time and then deprecate it.

In [None]:
fs.disable_feature_pipeline(FG_NAME)

#### We'll auto-define the new schema, adding two features to the original dataframe
We update the Pandas dataframe, indicating both the **name** and the **type** of each new feature.

In [None]:
df['Persona'] = int(0)
df['NewFeature1'] = float(0.0)

In [None]:
df

#### Now we create a new feature group with a new version, based on the updated schema

In [None]:
new_fg_name = FG_NAME + '-v2'
fs.create_fg_from_df(new_fg_name, df)

In [None]:
fs.describe_feature_group(new_fg_name)['FeatureDefinitions']

#### Use a new transform function that creates the two new features based on the raw data

In [None]:
%%writefile utilities/customer_sql_v2.py

def transform_query(fg_name: str) -> str:
 return f'''
 SELECT *, 
 IF (Id > 3, 0, 1) as Persona, 
 (zipcode * 2) + RAND() as NewFeature1 
 FROM {fg_name} 
 '''


#### Locally test out the new transforms

In [None]:
import utilities.customer_sql_v2
print(utilities.customer_sql_v2.transform_query(new_fg_name))

#### Schedule the new pipeline

In [None]:
fs.schedule_feature_pipeline(data_source, new_fg_name, 
 'utilities/customer_sql_v2.py', 
 script_type='pyspark_sql', schedule='rate(1 day)',
 instance_type='ml.m5.4xlarge', instance_count=1)

In [None]:
sm = boto3.client('sagemaker')
no_executions = True
while no_executions:
 summs = sm.list_pipeline_executions(PipelineName=f'sm-pipeline-{new_fg_name}')['PipelineExecutionSummaries']
 if len(summs) > 0:
 print(json.dumps(summs, indent=4, default=str))
 break
 time.sleep(15)

#### Here's an example transform query for creating time windowed aggregate features

In [None]:
%%writefile utilities/credit_card_sql_agg.py

def transform_query(fg_name: str) -> str:
 return f'''
 SELECT cc_num,
 COUNT(*) OVER 30day_w as trans_count_30d,
 COUNT(*) OVER day_w as trans_count_1d,
 AVG(amount) OVER 30day_w as amt_avg_30d,
 AVG(amount) OVER day_w as amt_avg_1d,
 SUM(amount) OVER 30day_w as amt_sum_30d,
 SUM(amount) OVER day_w as amt_sum_1d,
 date_format(datetime, "yyyy-MM-dd'T'HH:mm:ss.SS'Z'") as event_time
 FROM {fg_name}
 WINDOW
 30day_w AS (PARTITION BY cc_num order by cast(datetime AS timestamp) 
 RANGE INTERVAL 30 DAY PRECEDING), 
 day_w AS (PARTITION BY cc_num order by cast(datetime AS timestamp) 
 RANGE INTERVAL 1 DAY PRECEDING)
 '''


#### Show that the new features are available in the online store following the pipeline execution
The online store feature values will be available once the pipeline is completed.

In [None]:
feature_values = []
iterations = 0
while True:
 feature_values = fs.get_latest_feature_values(new_fg_name, [2])
 if len(feature_values):
 break
 else:
 if iterations == 0:
 print('Waiting for pipeline to complete...')
 iterations += 1
 time.sleep(60)
feature_values

In [None]:
fs.disable_feature_pipeline(new_fg_name)

## Clean up
Remove the pipeline for the first feature group, and the one for the new feature group.

In [None]:
fs.disable_feature_pipeline(FG_NAME)

In [None]:
fs.remove_feature_pipeline(FG_NAME)

In [None]:
fs.remove_feature_pipeline(new_fg_name)

Delete the feature groups

In [None]:
# fs.delete_feature_group(FG_NAME)

In [None]:
fs.delete_feature_group(new_fg_name)