# MLOps: Initial - Enable experimentation at scale

## Contents

- [Introduction](#Introduction)
- [Processing our dataset locally](#Processing-our-dataset-locally)
- [SageMaker Processing](#SageMaker-Processing)
- [SageMaker Feature Store](#SageMaker-Feature-Store)

## Introduction

This is our first notebook which will explore the data preparation stage of the ML workflow.

Here, we will put on the hat of `Data Scientist`/`Data Engineer` and will perform the tasks of gathering datasets, pre-processing those datasets to align with our upcoming Training needs. As part of this exercise, we will start by performing these steps manually inside our Notebook local environment. Then we will learn how to bring scale these steps using managed SageMaker processing capabilities. In the last step, we we will save the outcomes of our data processing to a SageMaker Feature Store.

![Notebook1](images/notebook-1.png)

To enable you to run these notebooks within a reasonable time (typically less than an hour), the use case is a straightforward regression task: predicting house prices based on a synthetic housing dataset. This dataset contains 8 housing features. Features include year built, number of bedrooms, size of lot, etc...

To begin, we'll import some necessary packages and set up directories for local training and test data. We'll also set up a SageMaker Session to perform various operations, and specify an Amazon S3 bucket to hold input data and output. The default bucket used here is created by SageMaker if it doesn't already exist, and named in accordance with the AWS account ID and AWS Region.

Let's get started!

**Imports**

In [None]:
pip install -U sagemaker

In [None]:
import sagemaker
import boto3
import pandas as pd
from time import strftime
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import os
import time
from sagemaker.feature_store.feature_group import FeatureGroup

**Session variables**

In [None]:
local_data_path = "data/raw/house_pricing.csv"

# setting train, validation and test sizes as strings as required by sagemaker arguments
train_size = 0.6
val_size = 0.2
test_size = 0.2
random_seed = 42 # setting random seed to ensure compatible results over multiple executions

# Useful SageMaker variables
sess = sagemaker.Session()
bucket = sess.default_bucket()
role_arn= sagemaker.get_execution_role()
region = sess.boto_region_name

# Local data paths
pipeline_scripts_dir = os.path.join(os.getcwd(), 'pipeline_scripts')
os.makedirs(pipeline_scripts_dir, exist_ok=True)

processed_dir = os.path.join(os.getcwd(), 'data/processed')
os.makedirs(processed_dir, exist_ok=True)

processed_train_dir = os.path.join(os.getcwd(), 'data/processed/train')
os.makedirs(processed_train_dir, exist_ok=True)

processed_validation_dir = os.path.join(os.getcwd(), 'data/processed/validation')
os.makedirs(processed_validation_dir, exist_ok=True)

processed_test_dir = os.path.join(os.getcwd(), 'data/processed/test')
os.makedirs(processed_test_dir, exist_ok=True)

script_processed_dir = os.path.join(os.getcwd(), 'data/script_processed')
os.makedirs(script_processed_dir, exist_ok=True)

script_processed_train_dir = os.path.join(os.getcwd(), 'data/script_processed/train')
os.makedirs(script_processed_train_dir, exist_ok=True)

script_processed_validation_dir = os.path.join(os.getcwd(), 'data/script_processed/validation')
os.makedirs(script_processed_validation_dir, exist_ok=True)

script_processed_test_dir = os.path.join(os.getcwd(), 'data/script_processed/test')
os.makedirs(script_processed_test_dir, exist_ok=True)

sm_processed_dir = os.path.join(os.getcwd(), 'data/sm_processed')
os.makedirs(sm_processed_dir, exist_ok=True)

sm_processed_train_dir = os.path.join(os.getcwd(), 'data/sm_processed/train')
os.makedirs(sm_processed_train_dir, exist_ok=True)

sm_processed_validation_dir = os.path.join(os.getcwd(), 'data/sm_processed/validation')
os.makedirs(sm_processed_validation_dir, exist_ok=True)

sm_processed_test_dir = os.path.join(os.getcwd(), 'data/sm_processed/test')
os.makedirs(sm_processed_test_dir, exist_ok=True)

# Data paths in S3
s3_prefix = 'mlops-workshop'

# SageMaker Processing variables
processing_instance_type = 'ml.m5.large'
processing_instance_count = 1
output_path = f's3://{bucket}/{s3_prefix}/data/sm_processed'

## Processing our dataset locally

**Reading Data**

First we'll read our dataset from the CSV file

In [None]:
# reading data
df = pd.read_csv(local_data_path)
df

**Changing target to the first column in the DataFrame**

Some SageMaker built-in algorithms for CSV training assume that the target variable is in the first column and that the CSV does not have a header record. Even though we'll be bringing our own model, let's do this change so that if we feel like using built-in algorithms, we can easily switch.

In [None]:
# shift column 'PRICE' to first position
first_column = df.pop('PRICE')
 
# insert column using insert(position,column_name,
# first_column) function
df.insert(0, 'PRICE', first_column)
df

**Splitting and Scaling data**

Now let's process our data for a Machine Learning model.

First we will split the data into train, validation and test datasets.

In [None]:
# splitting data
rest_size = 1.0 - train_size
df_train, df_rest = train_test_split(
 df,
 test_size=rest_size,
 train_size=train_size,
 random_state=random_seed
)
df_val, df_test = train_test_split(
 df_rest,
 test_size=(test_size / rest_size),
 train_size=(val_size / rest_size),
 random_state=random_seed
)
df_train.reset_index(inplace=True, drop=True)
df_val.reset_index(inplace=True, drop=True)
df_test.reset_index(inplace=True, drop=True)
train_perc = int((len(df_train)/len(df)) * 100)
print(f"Training size: {len(df_train)} - {train_perc}% of total")
val_perc = int((len(df_val)/len(df)) * 100)
print(f"Val size: {len(df_val)} - {val_perc}% of total")
test_perc = int((len(df_test)/len(df)) * 100)
print(f"Test size: {len(df_test)} - {test_perc}% of total")

Next we scale the data based on the training dataset

In [None]:
# scaling data
scaler_data = StandardScaler()
 
# fit scaler to training dataset
print("Fitting scaling to training data and transforming dataset...")
df_train_transformed = pd.DataFrame(
 scaler_data.fit_transform(df_train), 
 columns=df_train.columns
)
df_train_transformed['PRICE'] = df_train['PRICE']
df_train_transformed.head()

and apply this scaling to the validation

In [None]:
# apply scaler to validation dataset
print("Transforming validation dataset...")
df_val_transformed = pd.DataFrame(
 scaler_data.transform(df_val), 
 columns=df_val.columns
)
df_val_transformed['PRICE'] = df_val['PRICE']
df_val_transformed.head()

and test datasets

In [None]:
# apply scaler to test dataset
print("Transforming test dataset...")
df_test_transformed = pd.DataFrame(
 scaler_data.transform(df_test), 
 columns=df_test.columns
)
df_test_transformed['PRICE'] = df_test['PRICE']
df_test_transformed.head()

Awesome. Let's go ahead and save our generated dataset so that we can do some preprocessing locally on our notebook's instance.

In [None]:
# Save data locally
df_train_transformed.to_csv(processed_train_dir+'/train.csv', sep=',', index=False, header=False)
df_val_transformed.to_csv(processed_validation_dir+'/validation.csv', sep=',', index=False, header=False)
df_test_transformed.to_csv(processed_test_dir+'/test.csv', sep=',', index=False, header=False)

Now that we've verified our processing code works, let's write it out to a file and apply it against our locally saved raw data that we generated in the cells above.

In [None]:
%%writefile ./local_preprocessing.py

import pandas as pd
import argparse
import os
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split


def read_parameters():
 """
 Read job parameters
 Returns:
 (Namespace): read parameters
 """
 parser = argparse.ArgumentParser()
 parser.add_argument('--train_size', type=float, default=0.6)
 parser.add_argument('--val_size', type=float, default=0.2)
 parser.add_argument('--test_size', type=float, default=0.2)
 parser.add_argument('--random_state', type=int, default=42)
 parser.add_argument('--target_col', type=str, default='PRICE')
 params, _ = parser.parse_known_args()
 return params


def change_target_to_first_col(df, target_col):
 # shift column 'PRICE' to first position
 first_column = df.pop(target_col)
 
 # insert column using insert(position,column_name,
 # first_column) function
 df.insert(0, target_col, first_column)
 return df


def split_dataset(df, train_size, val_size, test_size, random_state=None):
 """
 Split dataset into train, validation and test samples
 Args:
 df (pandas.DataFrame): input data
 train_size (float): ratio of data to use as training dataset
 val_size (float): ratio of data to use as validation dataset
 test_size (float): ratio of data to use as test dataset
 random_state (int): Pass an int for reproducible output across multiple function calls.
 Returns:
 df_train (pandas.DataFrame): train dataset
 df_val (pandas.DataFrame): validation dataset
 df_test (pandas.DataFrame): test dataset
 """
 if (train_size + val_size + test_size) != 1.0:
 raise ValueError("train_size, val_size and test_size must sum up to 1.0")
 rest_size = 1 - train_size
 df_train, df_rest = train_test_split(
 df,
 test_size=rest_size,
 train_size=train_size,
 random_state=random_state
 )
 df_val, df_test = train_test_split(
 df_rest,
 test_size=(test_size / rest_size),
 train_size=(val_size / rest_size),
 random_state=random_state
 )
 df_train.reset_index(inplace=True, drop=True)
 df_val.reset_index(inplace=True, drop=True)
 df_test.reset_index(inplace=True, drop=True)
 train_perc = int((len(df_train)/len(df)) * 100)
 print(f"Training size: {len(df_train)} - {train_perc}% of total")
 val_perc = int((len(df_val)/len(df)) * 100)
 print(f"Val size: {len(df_val)} - {val_perc}% of total")
 test_perc = int((len(df_test)/len(df)) * 100)
 print(f"Test size: {len(df_test)} - {test_perc}% of total")
 return df_train, df_val, df_test


def scale_dataset(df_train, df_val, df_test, target_col):
 """
 Fit StandardScaler to df_train and apply it to df_val and df_test
 Args:
 df_train (pandas.DataFrame): train dataset
 df_val (pandas.DataFrame): validation dataset
 df_test (pandas.DataFrame): test dataset
 target_col (str): target col
 Returns:
 df_train_transformed (pandas.DataFrame): train data scaled
 df_val_transformed (pandas.DataFrame): val data scaled
 df_test_transformed (pandas.DataFrame): test data scaled
 """
 scaler_data = StandardScaler()
 
 # fit scaler to training dataset
 print("Fitting scaling to training data and transforming dataset...")
 df_train_transformed = pd.DataFrame(
 scaler_data.fit_transform(df_train), 
 columns=df_train.columns
 )
 df_train_transformed[target_col] = df_train[target_col]
 
 # apply scaler to validation and test datasets
 print("Transforming validation and test datasets...")
 df_val_transformed = pd.DataFrame(
 scaler_data.transform(df_val), 
 columns=df_val.columns
 )
 df_val_transformed[target_col] = df_val[target_col]
 df_test_transformed = pd.DataFrame(
 scaler_data.transform(df_test), 
 columns=df_test.columns
 )
 df_test_transformed[target_col] = df_test[target_col]
 return df_train_transformed, df_val_transformed, df_test_transformed


print(f"===========================================================")
print(f"Starting pre-processing")
print(f"Reading parameters")

# reading job parameters
args = read_parameters()
print(f"Parameters read: {args}")

# set input path
input_data_path = "data/raw/house_pricing.csv"

# read data input
df = pd.read_csv(input_data_path)

# move target to first col
df = change_target_to_first_col(df, args.target_col)

# split dataset into train, validation and test
df_train, df_val, df_test = split_dataset(
 df,
 train_size=args.train_size,
 val_size=args.val_size,
 test_size=args.test_size,
 random_state=args.random_state
)

# scale datasets
df_train_transformed, df_val_transformed, df_test_transformed = scale_dataset(
 df_train, 
 df_val, 
 df_test,
 args.target_col
)

df_train_transformed.to_csv('data/script_processed/train/train.csv', sep=',', index=False, header=False)
df_val_transformed.to_csv('data/script_processed/validation/validation.csv', sep=',', index=False, header=False)
df_test_transformed.to_csv('data/script_processed/test/test.csv', sep=',', index=False, header=False)



print(f"Ending pre-processing")
print(f"===========================================================")

In [None]:
!python local_preprocessing.py --train_size 0.6 --val_size 0.2 --test_size 0.2 --random_state 42

Let's compare the `train.csv` file that was an output of the script to our `df_train_transformed` variable we have saved in memory to make sure they're the same and our preprocessing script worked.

In [None]:
from pandas.testing import assert_frame_equal
locally_processed_df_train_transformed = pd.read_csv(
 'data/script_processed/train/train.csv', 
 sep=',', 
 header=None
)
locally_processed_df_train_transformed.columns = df_train_transformed.columns
try:
 assert_frame_equal(df_train_transformed, locally_processed_df_train_transformed)
 print("Nice! They match.")
except Exception as e:
 print(e)

Ok, so you can process data locally, but this is a smaller dataset. What if you need to process hundreds of gigabytes or even terabytes of data? The processing done so far has been constrained by local resources; this notebook is being run on a single instance type that has memory and compute contraints so we can only process so much data with it.

In order to process larger amounts of data in a reasonable time, we really need to distribute our processing across a cluster of instances. Fortunately, SageMaker has a feature called SageMaker Processing that can help us with this task.

## SageMaker Processing
 
To process large amounts of data, we fortunately will not need to write distributed code oursleves. Instead, we can use [SageMaker Processing](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) which will do all the processing _outside_ of this notebook's resources and will apply our processing script to multiple data files in parallel.
 
Keep in mind that in a typical SageMaker workflow, notebooks are only used for initial model development activities and can be run on relatively inexpensive and less powerful instances. However, to run similar tasks at scale, data scientists require access to more powerful SageMaker managed compute instances for data preparation, training, and model hosting tasks. 

SageMaker Processing includes off-the-shelf support for [scikit-learn](https://docs.aws.amazon.com/sagemaker/latest/dg/use-scikit-learn-processing-container.html), [PySpark](https://docs.aws.amazon.com/sagemaker/latest/dg/use-spark-processing-container.html), and [other frameworks](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks.html) like Hugging Face, MXNet, PyTorch, TensorFlow, and XGBoost. You can even a Bring Your Own Container if one our our built-in containers does not suit your use case.

To leverage SageMaker Processing, we'll need our raw data in S3.

In [None]:
# Upload raw data to S3
raw_data_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = sess.upload_data(path='./data/raw/house_pricing.csv', key_prefix=raw_data_s3_prefix)

Next, we'll simply supply our Python processing script with a simple modifications to replace the local path we saved our processed data to with the correct container path.

In [None]:
%%writefile ./pipeline_scripts/preprocessing.py

import pandas as pd
import argparse
import os
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split


def read_parameters():
 """
 Read job parameters
 Returns:
 (Namespace): read parameters
 """
 parser = argparse.ArgumentParser()
 parser.add_argument('--train_size', type=float, default=0.6)
 parser.add_argument('--val_size', type=float, default=0.2)
 parser.add_argument('--test_size', type=float, default=0.2)
 parser.add_argument('--random_state', type=int, default=42)
 parser.add_argument('--target_col', type=str, default='PRICE')
 params, _ = parser.parse_known_args()
 return params


def change_target_to_first_col(df, target_col):
 # shift column 'PRICE' to first position
 first_column = df.pop(target_col)
 
 # insert column using insert(position,column_name,
 # first_column) function
 df.insert(0, target_col, first_column)
 return df


def split_dataset(df, train_size, val_size, test_size, random_state=None):
 """
 Split dataset into train, validation and test samples
 Args:
 df (pandas.DataFrame): input data
 train_size (float): ratio of data to use as training dataset
 val_size (float): ratio of data to use as validation dataset
 test_size (float): ratio of data to use as test dataset
 random_state (int): Pass an int for reproducible output across multiple function calls.
 Returns:
 df_train (pandas.DataFrame): train dataset
 df_val (pandas.DataFrame): validation dataset
 df_test (pandas.DataFrame): test dataset
 """
 if (train_size + val_size + test_size) != 1.0:
 raise ValueError("train_size, val_size and test_size must sum up to 1.0")
 rest_size = 1 - train_size
 df_train, df_rest = train_test_split(
 df,
 test_size=rest_size,
 train_size=train_size,
 random_state=random_state
 )
 df_val, df_test = train_test_split(
 df_rest,
 test_size=(test_size / rest_size),
 train_size=(val_size / rest_size),
 random_state=random_state
 )
 df_train.reset_index(inplace=True, drop=True)
 df_val.reset_index(inplace=True, drop=True)
 df_test.reset_index(inplace=True, drop=True)
 train_perc = int((len(df_train)/len(df)) * 100)
 print(f"Training size: {len(df_train)} - {train_perc}% of total")
 val_perc = int((len(df_val)/len(df)) * 100)
 print(f"Val size: {len(df_val)} - {val_perc}% of total")
 test_perc = int((len(df_test)/len(df)) * 100)
 print(f"Test size: {len(df_test)} - {test_perc}% of total")
 return df_train, df_val, df_test


def scale_dataset(df_train, df_val, df_test, target_col):
 """
 Fit StandardScaler to df_train and apply it to df_val and df_test
 Args:
 df_train (pandas.DataFrame): train dataset
 df_val (pandas.DataFrame): validation dataset
 df_test (pandas.DataFrame): test dataset
 target_col (str): target col
 Returns:
 df_train_transformed (pandas.DataFrame): train data scaled
 df_val_transformed (pandas.DataFrame): val data scaled
 df_test_transformed (pandas.DataFrame): test data scaled
 """
 scaler_data = StandardScaler()
 
 # fit scaler to training dataset
 print("Fitting scaling to training data and transforming dataset...")
 df_train_transformed = pd.DataFrame(
 scaler_data.fit_transform(df_train), 
 columns=df_train.columns
 )
 df_train_transformed[target_col] = df_train[target_col]
 
 # apply scaler to validation and test datasets
 print("Transforming validation and test datasets...")
 df_val_transformed = pd.DataFrame(
 scaler_data.transform(df_val), 
 columns=df_val.columns
 )
 df_val_transformed[target_col] = df_val[target_col]
 df_test_transformed = pd.DataFrame(
 scaler_data.transform(df_test), 
 columns=df_test.columns
 )
 df_test_transformed[target_col] = df_test[target_col]
 return df_train_transformed, df_val_transformed, df_test_transformed


print(f"===========================================================")
print(f"Starting pre-processing")
print(f"Reading parameters")

# reading job parameters
args = read_parameters()
print(f"Parameters read: {args}")

# set input and output paths
input_data_path = "/opt/ml/processing/input/house_pricing.csv"
train_data_path = "/opt/ml/processing/output/train"
val_data_path = "/opt/ml/processing/output/validation"
test_data_path = "/opt/ml/processing/output/test"

try:
 os.makedirs(train_data_path)
 os.makedirs(val_data_path)
 os.makedirs(test_data_path)
except:
 pass


# read data input
df = pd.read_csv(input_data_path)

# move target to first col
df = change_target_to_first_col(df, args.target_col)

# split dataset into train, validation and test
df_train, df_val, df_test = split_dataset(
 df,
 train_size=args.train_size,
 val_size=args.val_size,
 test_size=args.test_size,
 random_state=args.random_state
)

# scale datasets
df_train_transformed, df_val_transformed, df_test_transformed = scale_dataset(
 df_train, 
 df_val, 
 df_test,
 args.target_col
)

print("Saving data")
df_train_transformed.to_csv(train_data_path+'/train.csv', sep=',', index=False, header=False)
df_val_transformed.to_csv(val_data_path+'/validation.csv', sep=',', index=False, header=False)
df_test_transformed.to_csv(test_data_path+'/test.csv', sep=',', index=False, header=False)



print(f"Ending pre-processing")
print(f"===========================================================")

Since we're using scikit-learn in our processing script, we'll tell SageMaker Processing that we'll need the scikit-learn processor (the container associated with this processor also includes common libraries like `pandas`) which the SageMaker SDK calls `SKLearnProcessor`. This object allows you to specify the instance type to use in the job as well as how many instances you want in your cluster. Although the synthetic housing dataset is quite small, we'll use two instances to showcase how easy it is to spin up a cluster for SageMaker Processing and parallelize your processing code.

In [None]:
sklearn_processor = SKLearnProcessor(
 framework_version='1.0-1',
 role=role_arn,
 instance_type=processing_instance_type,
 instance_count=processing_instance_count
)

We're now ready to run the Processing job.

In [None]:
from IPython.core.display import display, HTML
# code=can be a s3 uri for the input script
job_name = f"processing-{strftime('%Y-%m-%d-%H-%M-%S')}"

display(
 HTML(
 'Review Processing Job'.format(
 region, job_name
 )
 )
)
sklearn_processor.run(
 code='pipeline_scripts/preprocessing.py',
 job_name=job_name,
 inputs=[
 ProcessingInput(
 source=raw_s3,
 destination='/opt/ml/processing/input',
 s3_data_distribution_type='ShardedByS3Key'
 )
 ],
 outputs=[
 ProcessingOutput(
 output_name='train',
 destination=f'{output_path}/train',
 source='/opt/ml/processing/output/train'
 ), 
 ProcessingOutput(
 output_name='validation',
 destination=f'{output_path}/validation',
 source='/opt/ml/processing/output/validation'
 ),
 ProcessingOutput(
 output_name='test',
 destination=f'{output_path}/test',
 source='/opt/ml/processing/output/test'
 )
 ],
 # notice that all arguments passed to a SageMaker processing job should be strings as they are transformed to command line parameters.
 # Your read_parameters function will handle the data types for your code 
 arguments=[
 "--train_size", str(train_size),
 "--val_size", str(val_size),
 "--test_size", str(test_size),
 "--random_state", str(random_seed)
 ]
)
preprocessing_job_description = sklearn_processor.jobs[-1].describe()

Now that the SageMaker Processing job has finished, it has output the processed data in S3. Let's download that data locally and ensure it's what we expect.

In [None]:
# Download processed data from S3 to local storage
!aws s3 cp {output_path} ./data/sm_processed/ --recursive

In [None]:
sm_processed_df_train_transformed = pd.read_csv('data/sm_processed/train/train.csv', sep=',', header=None)
sm_processed_df_train_transformed.columns = df_train_transformed.columns
try:
 assert_frame_equal(df_train_transformed, sm_processed_df_train_transformed)
 print("Again they match!")
except Exception as e:
 print(e)

## SageMaker Feature Store
 
Features are inputs to ML models used during training and inference. Features are used repeatedly by multiple teams and feature quality is critical to ensure a highly accurate model. Also, when features used to train models offline in batch are made available for real-time inference, it’s hard to keep the two feature stores synchronized. [SageMaker Feature Store](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html) provides a secured and unified store for feature use across the ML lifecycle. 

Let's now exchange the storage of our processed data from s3 to SageMaker Feature Store.

![Notebook1](images/notebook-1fs.png)

**Feature Groups**

First let's define some feature groups for train, validation and test datasets and a s3 bucket prefix to store your feature store results

In [None]:
time_str = '-' + time.strftime('%Y-%m-%d-%H-%M-%S')
train_feature_group_name = "fs-train-"+time_str
validation_feature_group_name = "fs-validation-"+time_str
test_feature_group_name = "fs-test-"+time_str
bucket_prefix = "mlops-workshop/feature-store"

Next, we'll modify our Python processing script to include the creation of feature groups for the train, validation and test datasets and the injection of data to the feature groups.

In [None]:
%%writefile ./pipeline_scripts/preprocessing_with_fs.py
import subprocess
import sys
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker'])

from sagemaker.feature_store.feature_group import FeatureGroup
import pandas as pd
import argparse
import os
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import time
import sagemaker
import boto3


def read_parameters():
 """
 Read job parameters
 Returns:
 (Namespace): read parameters
 """
 parser = argparse.ArgumentParser()
 parser.add_argument('--train_size', type=float, default=0.6)
 parser.add_argument('--val_size', type=float, default=0.2)
 parser.add_argument('--test_size', type=float, default=0.2)
 parser.add_argument('--random_state', type=int, default=42)
 parser.add_argument('--train_feature_group_name', type=str, default='fs-train')
 parser.add_argument('--validation_feature_group_name', type=str, default='fs-validation')
 parser.add_argument('--test_feature_group_name', type=str, default='fs-test')
 parser.add_argument('--bucket_prefix', type=str, default='mlops-workshop/feature-store')
 parser.add_argument('--target_col', type=str, default='PRICE')
 parser.add_argument('--region', type=str)
 parser.add_argument('--role_arn', type=str)
 params, _ = parser.parse_known_args()
 return params


def change_target_to_first_col(df, target_col):
 # shift column 'PRICE' to first position
 first_column = df.pop(target_col)
 
 # insert column using insert(position,column_name,
 # first_column) function
 df.insert(0, target_col, first_column)
 return df


def split_dataset(df, train_size, val_size, test_size, random_state=None):
 """
 Split dataset into train, validation and test samples
 Args:
 df (pandas.DataFrame): input data
 train_size (float): ratio of data to use as training dataset
 val_size (float): ratio of data to use as validation dataset
 test_size (float): ratio of data to use as test dataset
 random_state (int): Pass an int for reproducible output across multiple function calls.
 Returns:
 df_train (pandas.DataFrame): train dataset
 df_val (pandas.DataFrame): validation dataset
 df_test (pandas.DataFrame): test dataset
 """
 if (train_size + val_size + test_size) != 1.0:
 raise ValueError("train_size, val_size and test_size must sum up to 1.0")
 rest_size = 1 - train_size
 df_train, df_rest = train_test_split(
 df,
 test_size=rest_size,
 train_size=train_size,
 random_state=random_state
 )
 df_val, df_test = train_test_split(
 df_rest,
 test_size=(test_size / rest_size),
 train_size=(val_size / rest_size),
 random_state=random_state
 )
 df_train.reset_index(inplace=True, drop=True)
 df_val.reset_index(inplace=True, drop=True)
 df_test.reset_index(inplace=True, drop=True)
 train_perc = int((len(df_train)/len(df)) * 100)
 print(f"Training size: {len(df_train)} - {train_perc}% of total")
 val_perc = int((len(df_val)/len(df)) * 100)
 print(f"Val size: {len(df_val)} - {val_perc}% of total")
 test_perc = int((len(df_test)/len(df)) * 100)
 print(f"Test size: {len(df_test)} - {test_perc}% of total")
 return df_train, df_val, df_test


def scale_dataset(df_train, df_val, df_test, target_col):
 """
 Fit StandardScaler to df_train and apply it to df_val and df_test
 Args:
 df_train (pandas.DataFrame): train dataset
 df_val (pandas.DataFrame): validation dataset
 df_test (pandas.DataFrame): test dataset
 target_col (str): target col
 Returns:
 df_train_transformed (pandas.DataFrame): train data scaled
 df_val_transformed (pandas.DataFrame): val data scaled
 df_test_transformed (pandas.DataFrame): test data scaled
 """
 scaler_data = StandardScaler()
 
 # fit scaler to training dataset
 print("Fitting scaling to training data and transforming dataset...")
 df_train_transformed = pd.DataFrame(
 scaler_data.fit_transform(df_train), 
 columns=df_train.columns
 )
 df_train_transformed[target_col] = df_train[target_col]
 
 # apply scaler to validation and test datasets
 print("Transforming validation and test datasets...")
 df_val_transformed = pd.DataFrame(
 scaler_data.transform(df_val), 
 columns=df_val.columns
 )
 df_val_transformed[target_col] = df_val[target_col]
 df_test_transformed = pd.DataFrame(
 scaler_data.transform(df_test), 
 columns=df_test.columns
 )
 df_test_transformed[target_col] = df_test[target_col]
 return df_train_transformed, df_val_transformed, df_test_transformed


def prepare_df_for_feature_store(df, data_type):
 """
 Add event time and record id to df in order to store it in SageMaker Feature Store
 Args:
 df (pandas.DataFrame): data to be prepared
 data_type (str): train/validation or test
 Returns:
 df (pandas.DataFrame): dataframe with event time and record id
 """
 print(f'Preparing {data_type} data for Feature Store..')
 current_time_sec = int(round(time.time()))
 # create event time
 df['event_time'] = pd.Series([current_time_sec]*len(df), dtype="float64")
 # create record id from index
 df['record_id'] = df.reset_index().index
 return df
 

def wait_for_feature_group_creation_complete(feature_group):
 """
 Function that waits for feature group to be created in SageMaker Feature Store
 Args:
 feature_group (sagemaker.feature_store.feature_group.FeatureGroup): Feature Group
 """
 status = feature_group.describe().get('FeatureGroupStatus')
 print(f'Initial status: {status}')
 while status == 'Creating':
 print(f'Waiting for feature group: {feature_group.name} to be created ...')
 time.sleep(5)
 status = feature_group.describe().get('FeatureGroupStatus')
 if status != 'Created':
 raise SystemExit(f'Failed to create feature group {feature_group.name}: {status}')
 print(f'FeatureGroup {feature_group.name} was successfully created.')
 

def create_feature_group(feature_group_name, sagemaker_session, df, prefix, role_arn):
 """
 Create Feature Store Group
 Args:
 feature_group_name (str): Feature Store Group Name
 sagemaker_session (sagemaker.session.Session): sagemaker session
 df (pandas.DataFrame): dataframe to injest used to create features definition
 prefix (str): geature group prefix (train/validation or test)
 role_arn (str): role arn to create feature store
 Returns:
 fs_group (sagemaker.feature_store.feature_group.FeatureGroup): Feature Group
 """
 fs_group = FeatureGroup(
 name=feature_group_name, 
 sagemaker_session=sagemaker_session
 )
 fs_group.load_feature_definitions(data_frame=df)
 default_bucket = sagemaker_session.default_bucket()
 print(f'Creating feature group: {fs_group.name} ...')
 fs_group.create(
 s3_uri=f's3://{default_bucket}/{prefix}', 
 record_identifier_name='record_id', 
 event_time_feature_name='event_time', 
 role_arn=role_arn, 
 enable_online_store=True
 )
 wait_for_feature_group_creation_complete(fs_group)
 return fs_group


def ingest_features(fs_group, df):
 """
 Ingest features to Feature Store Group
 Args:
 fs_group (sagemaker.feature_store.feature_group.FeatureGroup): Feature Group
 df (pandas.DataFrame): dataframe to injest
 """
 print(f'Ingesting data into feature group: {fs_group.name} ...')
 fs_group.ingest(data_frame=df, max_processes=3, wait=True)
 print(f'{len(df)} records ingested into feature group: {fs_group.name}')
 return


print(f"===========================================================")
print(f"Starting pre-processing")
print(f"Reading parameters")

# reading job parameters
args = read_parameters()
print(f"Parameters read: {args}")
sagemaker_session = sagemaker.Session(boto3.Session(region_name=args.region))

# set input path
input_data_path = "/opt/ml/processing/input/house_pricing.csv"

# read data input
df = pd.read_csv(input_data_path)

# move target to first col
df = change_target_to_first_col(df, args.target_col)

# split dataset into train, validation and test
df_train, df_val, df_test = split_dataset(
 df,
 train_size=args.train_size,
 val_size=args.val_size,
 test_size=args.test_size,
 random_state=args.random_state
)

# scale datasets
df_train_transformed, df_val_transformed, df_test_transformed = scale_dataset(
 df_train, 
 df_val, 
 df_test,
 args.target_col
)

# prepare datasets for Feature Store
df_train_transformed_fs = prepare_df_for_feature_store(df_train_transformed, 'train')
df_val_transformed_fs = prepare_df_for_feature_store(df_val_transformed, 'validation')
df_test_transformed_fs = prepare_df_for_feature_store(df_test_transformed, 'test')

# injest datasets to Feature Store
fs_group_train = create_feature_group(
 args.train_feature_group_name, 
 sagemaker_session, 
 df_train_transformed_fs, 
 args.bucket_prefix+'/train',
 args.role_arn
)
ingest_features(fs_group_train, df_train_transformed_fs)

fs_group_validation = create_feature_group(
 args.validation_feature_group_name, 
 sagemaker_session, 
 df_val_transformed_fs, 
 args.bucket_prefix+'/validation',
 args.role_arn
)
ingest_features(fs_group_validation, df_val_transformed_fs)

fs_group_test = create_feature_group(
 args.test_feature_group_name, 
 sagemaker_session, 
 df_test_transformed_fs, 
 args.bucket_prefix+'/test',
 args.role_arn
)
ingest_features(fs_group_test, df_test_transformed_fs)

print(f"Ending pre-processing")
print(f"===========================================================")

We're now ready to run the Processing job with the feature store code included on it.

In [None]:
# code=can be a s3 uri for the input script
job_name = f"processing-with-fs-{strftime('%Y-%m-%d-%H-%M-%S')}"

display(
 HTML(
 'Review Feature Store Processing Job'.format(
 region, job_name
 )
 )
)

sklearn_processor.run(
 code='pipeline_scripts/preprocessing_with_fs.py',
 job_name=job_name,
 inputs=[
 ProcessingInput(
 source=raw_s3,
 destination='/opt/ml/processing/input',
 s3_data_distribution_type='ShardedByS3Key'
 )
 ],
 # notice that all arguments passed to a SageMaker processing job should be strings as they are transformed to command line parameters.
 # Your read_parameters function will handle the data types for your code 
 arguments=[
 "--train_size", str(train_size),
 "--val_size", str(val_size),
 "--test_size", str(test_size),
 "--random_state", str(random_seed),
 "--train_feature_group_name", train_feature_group_name,
 "--validation_feature_group_name", validation_feature_group_name,
 "--test_feature_group_name", test_feature_group_name,
 "--bucket_prefix", bucket_prefix,
 "--role_arn", role_arn,
 "--region", region
 ]
)

preprocessing_job_description = sklearn_processor.jobs[-1].describe()

Let's now load the features from feature store

In [None]:
fs_group_train = FeatureGroup(name=train_feature_group_name, sagemaker_session=sess) 

In [None]:
train_query = fs_group_train.athena_query()
train_table = train_query.table_name

And let's display the data in a Dataframe:

**Important**: If your Dataframe is empty, you should wait a bit before training again. It takes a while for Feature Store data to be available in Athena Queries.

In [None]:
fs_df = pd.DataFrame()
while len(fs_df) == 0:
 if len(fs_df.columns) > 0:
 time.sleep(120)
 query_string = f'SELECT * FROM "sagemaker_featurestore"."{train_table}" ORDER BY record_id'
 query_results= 'sagemaker-featurestore'
 output_location = f's3://{bucket}/{query_results}/query_results/'
 train_query.run(query_string=query_string, output_location=output_location)
 train_query.wait()
 fs_df = train_query.as_dataframe()
fs_df

**Adapting the Athenas Query**

Notice that the data before includes extra information that allows us to audit and monitor the data from Feature Store. 

You can change your data to include only the data necessary for your model.

In [None]:
features_to_select = 'price,year_built,square_feet,num_bedrooms,num_bathrooms,lot_acres,garage_spaces,front_porch,deck'
query_string = f'SELECT {features_to_select} FROM "sagemaker_featurestore"."{train_table}" ORDER BY record_id'
query_results= 'sagemaker-featurestore'
output_location = f's3://{bucket}/{query_results}/query_results/'
train_query.run(query_string=query_string, output_location=output_location)
train_query.wait()
fs_df = train_query.as_dataframe()
fs_df

Next, we'll store some variables that will be used in our second notebook...

In [None]:
%store train_feature_group_name
%store validation_feature_group_name
%store test_feature_group_name
%store s3_prefix
%store output_path
%store features_to_select
%store sm_processed_train_dir
%store sm_processed_validation_dir
%store sm_processed_test_dir
%store raw_s3