# Lab 1 : Data Preparation and Process

### Loading stored variables
If you ran this notebook before, you may want to re-use the resources you aready created with AWS. Run the cell below to load any prevously created variables. You should see a print-out of the existing variables. If you don't see anything printed then it's probably the first time you are running the notebook!

* Please make sure to run the prerequisites.ipynb in the notebook folder before running this notebook

In [None]:
%store -r
%store

: You must have run the previous sequential notebooks to retrieve variables using the StoreMagic command.

## Preparation


Let's start by specifying:

- The S3 bucket and prefix that you want to use for training and model data. This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these. Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp with a the appropriate full IAM role arn string(s).

In [None]:
# cell 01
import sagemaker
bucket=sagemaker.Session().default_bucket()
prefix = 'sagemaker/DEMO-xgboost-tripfare'
 
# Define IAM role
import boto3
import re
import os
from sagemaker import get_execution_role

role = get_execution_role()
sagemaker_session = sagemaker.Session()

In [None]:
# cell 02
import numpy as np # For matrix operations and numerical processing
import pandas as pd # For munging tabular data
import matplotlib.pyplot as plt # For charts and visualizations
from IPython.display import Image # For displaying images in the notebook
from IPython.display import display # For displaying outputs in the notebook
from time import gmtime, strftime # For labeling SageMaker models, endpoints, etc.
import sys # For writing outputs to notebook
import math # For ceiling function
import json # For parsing hosting outputs
import os # For manipulating filepath names
import sagemaker 
import zipfile # Amazon SageMaker's Python SDK provides many helper functions

In [None]:
if 'input_source' not in locals():
 
 input_source = f's3://{bucket}/{prefix}/input/'
 %store input_source
 
else:
 print(f'input source is available: {input_source}')

input_data = input_source + 'data/green/'
%store input_data

In [None]:
# cell 05
from sagemaker import Session

sess = Session()

# Create Feature Groups

In this notebook, you will learn how to create a feature group in the SageMaker Feature Store. You will then learn how to ingest the feature 
columns into the created feature groups using SageMaker Python SDK. You will also see how to get an ingested feature record from the Online store. In the end, you will know how to list all the feature groups created within the Feature Store and delete them.

In [None]:
column_schemas = [
 {
 "name": "fare_amount",
 "type": "float"
 },
 {
 "name": "passenger_count",
 "type": "long"
 },
 {
 "name": "pickup_latitude",
 "type": "float"
 },
 {
 "name": "pickup_longitude",
 "type": "float"
 },
 {
 "name": "dropoff_latitude",
 "type": "float"
 },
 {
 "name": "dropoff_longitude",
 "type": "float"
 },
 {
 "name": "geo_distance",
 "type": "float"
 },
 {
 "name": "hour",
 "type": "int"
 },
 {
 "name": "weekday",
 "type": "int"
 },
 {
 "name": "month",
 "type": "int"
 },
 {
 "name": "FS_ID",
 "type": "long"
 },
 {
 "name": "FS_time",
 "type": "float"
 },
]

In [None]:
from sagemaker.feature_store.feature_definition import FeatureDefinition
from sagemaker.feature_store.feature_definition import FeatureTypeEnum

default_feature_type = FeatureTypeEnum.STRING
column_to_feature_type_mapping = {
 "float": FeatureTypeEnum.FRACTIONAL,
 "long": FeatureTypeEnum.INTEGRAL
}

feature_definitions = [
 FeatureDefinition(
 feature_name=column_schema['name'], 
 feature_type=column_to_feature_type_mapping.get(column_schema['type'], default_feature_type)
 ) for column_schema in column_schemas
]

### Initialize & Create Feature Group

In [None]:
region = boto3.Session().region_name
boto_session = boto3.Session(region_name=region)

sagemaker_client = boto_session.client(service_name='sagemaker', region_name=region)
featurestore_runtime = boto_session.client(service_name='sagemaker-featurestore-runtime', region_name=region)

feature_store_session = sagemaker.session.Session(
 boto_session=boto_session,
 sagemaker_client=sagemaker_client,
 sagemaker_featurestore_runtime_client=featurestore_runtime
)

In [None]:
current_timestamp = strftime('%m-%d-%H-%M', gmtime())

# prefix to track all the feature groups created as part of feature store champions workshop (fscw)
fs_prefix = 'fscw-'

tripfare_feature_group_name = f'{fs_prefix}tripfare-{current_timestamp}'
%store tripfare_feature_group_name

print(f'Customers feature group name = {tripfare_feature_group_name}')

Feature group is initialized and created below

In [None]:
from sagemaker.feature_store.feature_group import FeatureGroup

enable_online_store=True
feature_store_offline_s3_uri = 's3://' + bucket

record_identifier_feature_name = 'FS_ID'
event_time_feature_name = 'FS_time'

feature_group = FeatureGroup(
 name=tripfare_feature_group_name, sagemaker_session=feature_store_session, feature_definitions=feature_definitions)

feature_group.create(
 s3_uri=feature_store_offline_s3_uri,
 record_identifier_name=record_identifier_feature_name,
 event_time_feature_name=event_time_feature_name,
 role_arn=role,
 enable_online_store=enable_online_store
)

In [None]:
import time
def wait_for_feature_group_creation_complete(feature_group):
 """Helper function to wait for the completions of creating a feature group"""
 status = feature_group.describe().get("FeatureGroupStatus")
 while status == "Creating":
 print("Waiting for Feature Group Creation")
 time.sleep(5)
 status = feature_group.describe().get("FeatureGroupStatus")
 if status != "Created":
 raise SystemExit(f"Failed to create feature group {feature_group.name}: {status}")
 print(f"FeatureGroup {feature_group.name} successfully created.")

wait_for_feature_group_creation_complete(feature_group=feature_group)

# Feature Engineering with Amazon SageMaker Processing

Amazon SageMaker Processing allows you to run steps for data pre- or post-processing, feature engineering, data validation, or model evaluation workloads on Amazon SageMaker. Processing jobs accept data from Amazon S3 as input and store data into Amazon S3 as output.

![processing](https://sagemaker.readthedocs.io/en/stable/_images/amazon_sagemaker_processing_image1.png)

Here, we'll import the dataset and transform it with SageMaker Processing, which can be used to process terabytes of data in a SageMaker-managed cluster separate from the instance running your notebook server. In a typical SageMaker workflow, notebooks are only used for prototyping and can be run on relatively inexpensive and less powerful instances, while processing, training and model hosting tasks are run on separate, more powerful SageMaker-managed instances. SageMaker Processing includes off-the-shelf support for Scikit-learn, as well as a Bring Your Own Container option, so it can be used with many different data transformation technologies and tasks. 

To use SageMaker Processing, simply supply a Python data preprocessing script as shown below. For this example, we're using a SageMaker prebuilt Scikit-learn container, which includes many common functions for processing data. There are few limitations on what kinds of code and operations you can run, and only a minimal contract: input and output data must be placed in specified directories. If this is done, SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete.

### Create Sklearn SageMaker Processing script

In [None]:
%%writefile preprocess.py

import glob
import logging
import os
import subprocess
import sys

subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker'])

from zipfile import ZipFile
# from time import gmtime, strftime
import socket
import shutil
import json
import time
import argparse
import boto3
import uuid

n_cores = os.cpu_count()
# host_name = socket.gethostname()
# print(host_name)
# print(os.environ)

# Install geopandas dependency before including pandas
subprocess.check_call([sys.executable, "-m", "pip", "install", "geopandas==0.9.0"])

import pandas as pd # noqa: E402
import geopandas as gpd # noqa: E402
from sklearn.model_selection import train_test_split # noqa: E402

import sagemaker
from sagemaker.feature_store.feature_group import FeatureGroup

def get_session(region, default_bucket):
 """Gets the sagemaker session based on the region.
 """

 boto_session = boto3.Session(region_name=region)

 sagemaker_client = boto_session.client("sagemaker")
 return sagemaker.session.Session(
 boto_session=boto_session,
 sagemaker_client=sagemaker_client,
 default_bucket=default_bucket,
 )


logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())

def parse_args() -> None:
 parser = argparse.ArgumentParser()
 parser.add_argument('--ingest_featuregroup_name', type=str, default=None)
 parser.add_argument('--region', type=str)
 parser.add_argument('--bucket', type=str)
 parser.add_argument('--base_dir', type=str, default="/opt/ml/processing")
 args, _ = parser.parse_known_args()
 return args

def extract_zones(zones_file: str, zones_dir: str):
 logger.info(f"Extracting zone file: {zones_file}")
 with ZipFile(zones_file, "r") as zip:
 zip.extractall(zones_dir)


def load_zones(zones_dir: str):
 logging.info(f"Loading zones from {zones_dir}")
 # Load the shape file and get the geometry and lat/lon
 zone_df = gpd.read_file(os.path.join(zones_dir, "taxi_zones.shp"))
 # Get centroids as EPSG code of 3310 to measure distance
 zone_df["centroid"] = zone_df.geometry.centroid.to_crs(epsg=3310)
 # Convert cordinates to the WSG84 lat/long CRS has a EPSG code of 4326.
 zone_df["latitude"] = zone_df.centroid.to_crs(epsg=4326).x
 zone_df["longitude"] = zone_df.centroid.to_crs(epsg=4326).y
 return zone_df


def load_data(file_list: list):
 # Define dates, and columns to use
 use_cols = [
 "fare_amount",
 "lpep_pickup_datetime",
 "lpep_dropoff_datetime",
 "passenger_count",
 "PULocationID",
 "DOLocationID",
 ]
 # Concat input files with select columns
 dfs = []
 for file in file_list:
 df = pd.read_parquet(file, engine='pyarrow', columns=use_cols)
 df = df.fillna(0)
 df['passenger_count'] = df['passenger_count'].astype('int64')
 dfs.append(df)
 return pd.concat(dfs, ignore_index=True)


def enrich_data(trip_df: pd.DataFrame, zone_df: pd.DataFrame):
 # Join trip DF to zones for poth pickup and drop off locations
 trip_df = gpd.GeoDataFrame(
 trip_df.join(zone_df, on="PULocationID").join(
 zone_df, on="DOLocationID", rsuffix="_DO", lsuffix="_PU"
 )
 )
 trip_df["geo_distance"] = (
 trip_df["centroid_PU"].distance(trip_df["centroid_DO"]) / 1000
 )

 # Add date parts
 trip_df["lpep_pickup_datetime"] = pd.to_datetime(trip_df["lpep_pickup_datetime"])
 trip_df["hour"] = trip_df["lpep_pickup_datetime"].dt.hour
 trip_df["weekday"] = trip_df["lpep_pickup_datetime"].dt.weekday
 trip_df["month"] = trip_df["lpep_pickup_datetime"].dt.month

 # Get calculated duration in minutes
 trip_df["lpep_dropoff_datetime"] = pd.to_datetime(trip_df["lpep_dropoff_datetime"])
 trip_df["duration_minutes"] = (
 trip_df["lpep_dropoff_datetime"] - trip_df["lpep_pickup_datetime"]
 ).dt.seconds / 60

 # Rename and filter cols
 trip_df = trip_df.rename(
 columns={
 "latitude_PU": "pickup_latitude",
 "longitude_PU": "pickup_longitude",
 "latitude_DO": "dropoff_latitude",
 "longitude_DO": "dropoff_longitude",
 }
 )
 
 trip_df['FS_ID'] = trip_df.index + 1000
 current_time_sec = int(round(time.time()))
 trip_df["FS_time"] = pd.Series([current_time_sec]*len(trip_df), dtype="float64")
 return trip_df


def clean_data(trip_df: pd.DataFrame):
 # Remove outliers
 trip_df = trip_df[
 (trip_df.fare_amount > 0)
 & (trip_df.fare_amount < 200)
 & (trip_df.passenger_count > 0)
 & (trip_df.duration_minutes > 0)
 & (trip_df.duration_minutes < 120)
 & (trip_df.geo_distance > 0)
 & (trip_df.geo_distance < 121)
 ].dropna()

 # Filter columns
 cols = [
 "fare_amount",
 "passenger_count",
 "pickup_latitude",
 "pickup_longitude",
 "dropoff_latitude",
 "dropoff_longitude",
 "geo_distance",
 "hour",
 "weekday",
 "month",
 ]
 
 cols_fg = [
 "fare_amount",
 "passenger_count",
 "pickup_latitude",
 "pickup_longitude",
 "dropoff_latitude",
 "dropoff_longitude",
 "geo_distance",
 "hour",
 "weekday",
 "month",
 "FS_ID",
 "FS_time"
 ]
 return trip_df[cols], trip_df[cols_fg]

def ingest_data(data_fg: pd.DataFrame, fg_name: str, sagemaker_session) -> None:
 
 # 4 threads per python process
 num_workers = 4
 num_processes = n_cores
 logger.info(f'Ingesting into feature group [{fg_name}] using {num_processes} processes and {num_workers} workers')
 fg = FeatureGroup(name=fg_name, sagemaker_session=sagemaker_session)
 response = fg.ingest(data_frame=data_fg, max_processes=num_processes, max_workers=num_workers, wait=True)
 """
 The ingest call above returns an IngestionManagerPandas instance as a response. Zero based indices of rows 
 that failed to be ingested are captured via failed_rows in this response. By asserting this count to be 0,
 we validated that all rows were successfully ingested without a failure.
 """
 assert len(response.failed_rows) == 0


def save_files(base_dir: str, data_df: pd.DataFrame, data_fg: pd.DataFrame, fg_name: str, 
 val_size=0.2, test_size=0.05, current_host=None, sagemaker_session=None):
 
 logger.info(f"Splitting {len(data_df)} rows of data into train, val, test.")

 train_df, val_df = train_test_split(data_df, test_size=val_size, random_state=42)
 val_df, test_df = train_test_split(val_df, test_size=test_size, random_state=42)

 logger.info(f"Writing out datasets to {base_dir}")
 tmp_id = uuid.uuid4().hex[:8]
 train_df.to_csv(f"{base_dir}/train/train_{current_host}_{tmp_id}.csv", header=False, index=False)
 val_df.to_csv(f"{base_dir}/validation/validation_{current_host}_{tmp_id}.csv", header=False, index=False)

 # Save test data without header
 test_df.to_csv(f"{base_dir}/test/test_{current_host}_{tmp_id}.csv", header=False, index=False)

 
 if fg_name:
 # batch ingestion to the feature group of all the data
 ingest_data(data_fg, fg_name, sagemaker_session)

 return

def _read_json(path): # type: (str) -> dict
 """Read a JSON file.
 Args:
 path (str): Path to the file.
 Returns:
 (dict[object, object]): A dictionary representation of the JSON file.
 """
 with open(path, "r") as f:
 return json.load(f)

def main(base_dir: str, args: argparse.Namespace):
 # Input data files
 input_dir = os.path.join(base_dir, "input/data")
 input_file_list = glob.glob(f"{input_dir}/*.parquet")
 logger.info(f"Input file list: {input_file_list}")
 
 if len(input_file_list) == 0:
 raise Exception(f"No input files found in {input_dir}")


 hosts = _read_json("/opt/ml/config/resourceconfig.json")
 logger.info(hosts)
 current_host = hosts["current_host"]
 logger.info(current_host)
 

 # Input zones file
 zones_dir = os.path.join(base_dir, "input/zones")
 zones_file = os.path.join(zones_dir, "taxi_zones.zip")
 if not os.path.exists(zones_file):
 raise Exception(f"Zones file {zones_file} does not exist")

 # Extract and load taxi zones geopandas dataframe
 extract_zones(zones_file, zones_dir)
 zone_df = load_zones(zones_dir)

 # Load input files
 data_df = load_data(input_file_list)
 data_df = enrich_data(data_df, zone_df)
 data_df, data_fg = clean_data(data_df)
 
 fg_name = args.ingest_featuregroup_name
 
 sagemaker_session = get_session(args.region, args.bucket)
 
 return save_files(base_dir, data_df, data_fg, fg_name, current_host=current_host, sagemaker_session=sagemaker_session)


if __name__ == "__main__":
 logger.info("Starting preprocessing.")
 args = parse_args()
 base_dir = args.base_dir
 main(base_dir, args)
 logger.info("Done")


In [None]:
process_script = os.getcwd() + '/preprocess.py'
%store process_script

Before starting the SageMaker Processing job, we instantiate a `SKLearnProcessor` object. This object allows you to specify the instance type to use in the job, as well as how many instances.

In [None]:
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.utils import name_from_base

base_job_name = 'sm-tripfare-processing'
sklearn_processor = SKLearnProcessor(
 framework_version="0.23-1",
 role=get_execution_role(),
 instance_type="ml.m5.4xlarge",
 instance_count=2, 
 base_job_name=base_job_name,
)

processing_job_name = name_from_base(base_job_name)


In [None]:
train_path = f"s3://{bucket}/{prefix}/train/{processing_job_name}"
validation_path = f"s3://{bucket}/{prefix}/validation/{processing_job_name}"
test_path = f"s3://{bucket}/{prefix}/test/{processing_job_name}"


In [None]:

sklearn_processor.run(
 code='preprocess.py',
 arguments = [
 '--ingest_featuregroup_name', tripfare_feature_group_name,
 '--region', region,
 '--bucket', bucket,
 ],
 inputs=[
 ProcessingInput(
 source=input_data,
 destination="/opt/ml/processing/input/data",
 s3_data_distribution_type="ShardedByS3Key",
 ),
 ProcessingInput(
 source=input_zones,
 destination="/opt/ml/processing/input/zones",
 s3_data_distribution_type="FullyReplicated",
 ),
 ],
 outputs=[
 ProcessingOutput(output_name="train", source="/opt/ml/processing/train", destination=train_path),
 ProcessingOutput(output_name="validation", source="/opt/ml/processing/validation", destination=validation_path),
 ProcessingOutput(output_name="test", source="/opt/ml/processing/test", destination=test_path),
 ],
 job_name=processing_job_name,

)

In [None]:
%store train_path
%store validation_path
%store test_path

# Get feature record from the Online feature store 

In [None]:
featurestore_runtime_client = sagemaker_session.boto_session.client('sagemaker-featurestore-runtime', region_name=region)

Retrieve a record from customers feature group

In [None]:
from random import randint
feature_id = f'{randint(1000, 10000)}'
print(f'feature_id={feature_id}') 

In [None]:
feature_record = featurestore_runtime_client.get_record(FeatureGroupName=tripfare_feature_group_name, 
 RecordIdentifierValueAsString=feature_id)
feature_record

# List feature groups 

In [None]:
sagemaker_client = sagemaker_session.boto_session.client('sagemaker', region_name=region)

In [None]:
response = sagemaker_client.list_feature_groups()
for fg in response['FeatureGroupSummaries']:
 fg_name = fg['FeatureGroupName']
 print(f'Found feature group: {fg_name}')

---

## End of Lab 1