<div class="alert alert-warning"> ⚠️ <strong> PRE-REQUISITE: </strong> Please select `Data Science 3.0` and `Python 3` kernel image, instance type `ml.t3.medium` for SageMaker Studio notebook.</div>

## Introduction

The first stage in the ML workflow is data preparation. In this notebook, we will explore data preparation on Sagemaker using Ray. Ray is an open-source distributed computing framework designed to accelerate and simplify the development of scalable and efficient machine learning applications. It provides a simple and flexible API for defining and executing [tasks](https://docs.ray.io/en/latest/ray-core/tasks.html) and [actors](https://docs.ray.io/en/latest/ray-core/actors.html) on a cluster of machines, allowing you to easily scale your machine learning workloads from a single machine to thousands of nodes.

Here, we will put on the hat of `Data Scientist`/`Data Engineer` and will perform the tasks of gathering datasets, load them in to a feature store, pre-processing those features to align with our upcoming Training needs. As part of this exercise, we will learn how to bring scale these steps using managed SageMaker processing capabilities. In the last step, we will split the dataset into training, validation and testing sets to be used with out ML workflow.

![Notebook1](./images/notebook-1.png)

To enable you to run these notebooks within a reasonable time (typically less than an hour), the use case is a straightforward regression task:  predicting house prices based on a synthetic housing dataset. This dataset contains 8 housing features. Features include year built, number of bedrooms, size of lot, etc...

To begin, we'll import some necessary packages and set up directories for local training and test data. We'll also set up a SageMaker Session to perform various operations, and specify an Amazon S3 bucket to hold input data and output. The default bucket used here is created by SageMaker if it doesn't already exist, and named in accordance with the AWS account ID and AWS Region.

Let's get started!

In [None]:
!pip install -U sagemaker ray modin[ray] pydantic==1.10.10

In [None]:
import sagemaker
import boto3
import pandas as pd
from time import strftime
from sagemaker.sklearn.processing import SKLearnProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler
import os
import time
from sagemaker.feature_store.feature_group import FeatureGroup

import ray
from ray.data import Dataset
from ray.data.preprocessors import StandardScaler


In [None]:
local_data_path_ray = "data/raw/ray/house_pricing.csv"
# setting train, validation and test sizes as strings as required by sagemaker arguments
train_size = 0.6
val_size = 0.2
test_size = 0.2
random_seed = 42 # setting random seed to ensure compatible results over multiple executions

# Useful SageMaker variables
sess = sagemaker.Session()
bucket = sess.default_bucket()
role_arn= sagemaker.get_execution_role()
region = sess.boto_region_name

# Local data paths
pipeline_scripts_dir = os.path.join(os.getcwd(), 'pipeline_scripts')
os.makedirs(pipeline_scripts_dir, exist_ok=True)

processed_dir = os.path.join(os.getcwd(), 'data/processed')
os.makedirs(processed_dir, exist_ok=True)

processed_train_dir = os.path.join(os.getcwd(), f'{processed_dir}/train')
os.makedirs(processed_train_dir, exist_ok=True)

processed_validation_dir = os.path.join(os.getcwd(), f'{processed_dir}/validation')
os.makedirs(processed_validation_dir, exist_ok=True)

processed_test_dir = os.path.join(os.getcwd(), f'{processed_dir}/test')
os.makedirs(processed_test_dir, exist_ok=True)

# Data paths in S3
s3_prefix = 'aws-sm-ray-workshop'

# SageMaker Processing variables
processing_instance_type = 'ml.m5.2xlarge'
processing_instance_count = 1
output_path = f's3://{bucket}/{s3_prefix}/data/sm_processed'

## Starting Ray on a single machine

`ray.init()` will attempt to find a Ray instance to connect to automatically. It follows these steps in order:

1. It checks for the `RAY_ADDRESS` OS environment variable.
2. It looks for the specific address passed to `ray.init(address=<address>)`.
3. If no address is provided, it connects to the most recent Ray instance that was launched on the same machine using `ray start`.


In [None]:
ray.init(include_dashboard=False)

In [None]:
# test reading data using ray.data
dataset = ray.data.read_csv(local_data_path_ray)
dataset

In [None]:
# Upload raw data to S3
raw_data_s3_prefix = '{}/data/raw'.format(s3_prefix)
raw_s3 = sess.upload_data(path=local_data_path_ray, key_prefix=raw_data_s3_prefix)

We're now ready to run the Processing job for spliting and scaling the data.

In [None]:
raw_s3

## SageMaker Feature Store
    
Features are inputs to ML models used during training and inference. Features are used repeatedly by multiple teams and feature quality is critical to ensure a highly accurate model. Also, when features used to train models offline in batch are made available for real-time inference, it’s hard to keep the two feature stores synchronized. [SageMaker Feature Store](https://docs.aws.amazon.com/sagemaker/latest/dg/feature-store.html) provides a secured and unified store for feature use across the ML lifecycle. 

Let's now exchange the storage of our processed data from S3 to SageMaker Feature Store.

![Notebook1](images/notebook-1fs.png)

**Feature Groups**

First let's define some feature groups for train, validation and test datasets and a S3 bucket prefix as offline feature store to store your features

In [None]:
# Optional step
# Delete all Feature Groups that start with the prefix fs-ray-. 
# This is to prevent duplication of feature stores when running this workshop multiple time.

sm_client = boto3.client('sagemaker', region_name=region)
sagemaker_session = sagemaker.Session(boto3.Session(region_name=region))
response = sm_client.list_feature_groups(
    NameContains='fs-ray-'
)

for feature in response["FeatureGroupSummaries"]:
    print(f'deleting {feature["FeatureGroupName"]}')
    resp = sm_client.delete_feature_group(
        FeatureGroupName=feature["FeatureGroupName"]
    )

## SageMaker Processing
    
To process large amounts of data, we fortunately will not need to write distributed code oursleves. Instead, we can use [SageMaker Processing](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job.html) which will do all the processing _outside_ of this notebook's resources and will apply our processing script to multiple data files in parallel on SageMaker Processing instances.
    
Keep in mind that in a typical SageMaker workflow, notebooks are only used for initial model development activities and can be run on relatively inexpensive and less powerful instances. However, to run similar tasks at scale, data scientists require access to more powerful SageMaker managed compute instances for data preparation, training, and model hosting tasks. 

SageMaker Processing includes off-the-shelf support for [scikit-learn](https://docs.aws.amazon.com/sagemaker/latest/dg/use-scikit-learn-processing-container.html), [PySpark](https://docs.aws.amazon.com/sagemaker/latest/dg/use-spark-processing-container.html), and [other frameworks](https://docs.aws.amazon.com/sagemaker/latest/dg/processing-job-frameworks.html) like Hugging Face, MXNet, PyTorch, TensorFlow, and XGBoost. You can even a Bring Your Own Container if one our our built-in containers does not suit your use case.

## Processing data using Modin[ray]

`Modin[Ray]`is an open-source Python library that integrates with Ray to provide a scalable and efficient way to process large datasets. It leverages parallel and distributed computing to accelerate data analysis tasks, making it easier to work with big data.
No prior knowledge of hardware resources or data distribution is required when using Modin. It is a drop-in replacement for pandas and offers a significant performance boost without any modifications to existing pandas notebooks, even on a single machine. Simply update the import statement and leverage Modin's capabilities, similar to how you would with pandas.

Note that in the Featurestore class is decorated with @ray.remote. This makes it a `ray actor`. An actor in Ray is similar to an object in object-oriented programming but with some important differences. Each actor has its own state, which can include variables, data structures, and methods. Actors communicate with each other by sending messages asynchronously, allowing for concurrent and parallel execution. This messaging mechanism is built on top of Ray's underlying task execution engine, enabling efficient and scalable distributed computing.

In [None]:
%%writefile ./pipeline_scripts/feature-store/script-fs.py

import subprocess
import sys
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker', 'ray', 'modin[ray]', 'pydantic==1.10.10'])

from sagemaker.feature_store.feature_group import FeatureGroup
import time
import argparse
import os
import numpy as np
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split
import time
import datetime
import sagemaker
import boto3
import glob
import modin.pandas as pd
import ray

########### BEGIN ACTOR #############

@ray.remote(num_cpus=0.5)
class Featurestore:
    
    def __init__(self):
        pass
    
    def ingest_features(self,feature_group_name, df, region):
        """
        Ingest features to Feature Store Group
        Args:
            feature_group_name (str): Feature Group Name
            data_path (str): Path to the train/validation/test data in CSV format.
        """
        featurestore_runtime_client = boto3.client('sagemaker-featurestore-runtime', region_name=region)
        for index, row in df.iterrows(): 
            try:
                featurestore_runtime_client.put_record(
                    FeatureGroupName=feature_group_name,
                    Record=[
                        {
                            'FeatureName': 'record_id',
                            'ValueAsString': str(int(row['record_id']))
                        },
                        {
                            'FeatureName': 'event_time',
                            'ValueAsString': str(row['event_time'])
                        },
                        {
                            'FeatureName': 'NUM_BATHROOMS',
                            'ValueAsString': str(row['NUM_BATHROOMS'])
                        },
                        {
                            'FeatureName': 'NUM_BEDROOMS',
                            'ValueAsString': str(row['NUM_BEDROOMS'])
                        },
                        {
                            'FeatureName': 'FRONT_PORCH',
                            'ValueAsString': str(row['FRONT_PORCH'])
                        },
                        {
                            'FeatureName': 'LOT_ACRES',
                            'ValueAsString': str(row['LOT_ACRES'])
                        },
                        {
                            'FeatureName': 'DECK',
                            'ValueAsString': str(row['DECK'])
                        },
                        {
                            'FeatureName': 'SQUARE_FEET',
                            'ValueAsString': str(row['SQUARE_FEET'])
                        },
                        {
                            'FeatureName': 'YEAR_BUILT',
                            'ValueAsString': str(row['YEAR_BUILT'])
                        },
                        {
                            'FeatureName': 'GARAGE_SPACES',
                            'ValueAsString': str(row['GARAGE_SPACES'])
                        },
                        {
                            'FeatureName': 'PRICE',
                            'ValueAsString': str(int(row['PRICE']))
                        },
                    ],
                    TargetStores=[
                        'OfflineStore'
                    ]
                )
            except Exception as e:
                print(f"An error occurred: {e}\nFailed to process record number {index} for feature group {feature_group_name}");

        return
    
########### END ACTOR #############


def wait_for_feature_group_creation_complete(feature_group):
    """
    Function that waits for feature group to be created in SageMaker Feature Store
    Args:
        feature_group (sagemaker.feature_store.feature_group.FeatureGroup): Feature Group
    """
    status = feature_group.describe().get('FeatureGroupStatus')
    print(f'Initial status: {status}')
    while status == 'Creating':
        print(f'Waiting for feature group: {feature_group.name} to be created ...')
        time.sleep(5)
        status = feature_group.describe().get('FeatureGroupStatus')
    if status != 'Created':
        raise SystemExit(f'Failed to create feature group {feature_group.name}: {status}')
    print(f'FeatureGroup {feature_group.name} was successfully created.')

def create_feature_group(feature_group_name, prefix, role_arn, region):
    """
    Create Feature Store Group
    Args:
        feature_group_name (str): Feature Store Group Name
        sagemaker_session (sagemaker.session.Session): sagemaker session
        df (pandas.DataFrame): dataframe to injest used to create features definition
        prefix (str): geature group prefix (train/validation or test)
        role_arn (str): role arn to create feature store
    Returns:
        fs_group (sagemaker.feature_store.feature_group.FeatureGroup): Feature Group
    """
    sm_client = boto3.client('sagemaker', region_name=region)
    sagemaker_session = sagemaker.Session(boto3.Session(region_name=region))

    default_bucket = sagemaker_session.default_bucket()

    # Search to see if the Feature Group already exists
    results = sm_client.search(
        Resource="FeatureGroup",
        SearchExpression={
            'Filters': [
                {
                    'Name': 'FeatureGroupName',
                    'Operator': 'Equals',
                    'Value': feature_group_name
                },
            ]
        }
    )

    # If a FeatureGroup was not found with the name, create one
    if not results['Results']:
        sm_client.create_feature_group(
            FeatureGroupName=feature_group_name,
            RecordIdentifierFeatureName='record_id',
            EventTimeFeatureName='event_time',
            OnlineStoreConfig={
                "EnableOnlineStore": False
            },
            OfflineStoreConfig={
                "S3StorageConfig": {
                    "S3Uri": f's3://{default_bucket}/{prefix}', 
                }, 
            },
            FeatureDefinitions=[
                {
                    'FeatureName': 'record_id',
                    'FeatureType': 'Integral'
                },
                {
                    'FeatureName': 'event_time',
                    'FeatureType': 'Fractional'
                },
                {
                    'FeatureName': 'NUM_BATHROOMS',
                    'FeatureType': 'Fractional'
                },
                {
                    'FeatureName': 'NUM_BEDROOMS',
                    'FeatureType': 'Fractional'
                },
                {
                    'FeatureName': 'FRONT_PORCH',
                    'FeatureType': 'Fractional'
                },
                {
                    'FeatureName': 'LOT_ACRES',
                    'FeatureType': 'Fractional'
                },
                {
                    'FeatureName': 'DECK',
                    'FeatureType': 'Fractional'
                },
                {
                    'FeatureName': 'SQUARE_FEET',
                    'FeatureType': 'Fractional'
                },
                {
                    'FeatureName': 'YEAR_BUILT',
                    'FeatureType': 'Fractional'
                },
                {
                    'FeatureName': 'GARAGE_SPACES',
                    'FeatureType': 'Fractional'
                },
                {
                    'FeatureName': 'PRICE',
                    'FeatureType': 'Integral'
                },
            ],
            RoleArn=role_arn
        )

    fs_group = FeatureGroup(
        name=feature_group_name, 
        sagemaker_session=sagemaker_session
    )

    wait_for_feature_group_creation_complete(fs_group)
    return fs_group


def prepare_df_for_feature_store(df):
    """
    Add event time and record id to df in order to store it in SageMaker Feature Store
    Args:
        df (pandas.DataFrame): data to be prepared
    Returns:
        df (pandas.DataFrame): dataframe with event time and record id
    """
    print(f'Preparing data for Feature Store..')
    current_time_sec = int(round(time.time()))
    # create event time
    df['event_time'] = pd.Series([current_time_sec]*len(df), dtype="float64")
    # create record id from index
    df['record_id'] = df.reset_index().index
    return df

def read_csv(path, num_actors):
    """
    Read all the CSV files with in a given directory
    IMPORTANT: All CSVs should have the same schema
    Args:
        path: the path in which the input file exist
    Returns:
        df (pandas.DataFrame): dataframe with CSV data
    """
    
    csv_files = glob.glob(os.path.join(path, "*.csv"))
    print(f"found {len(csv_files)} files")
    frames = []
    # loop over the list of csv files
    for f in csv_files:
        # read the csv file
        df = pd.read_csv(f)
        frames.append(df)

    data = pd.concat(frames)
    data = prepare_df_for_feature_store(data)
    # Split into partitions
    partitions = [ray.put(part) for part in np.array_split(data, num_actors)]
    return partitions
    

def read_parameters():
    """
    Read job parameters
    Returns:
        (Namespace): read parameters
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--feature_group_name', type=str, default='fs-ray-synthetic-house-price')
    parser.add_argument('--bucket_prefix', type=str, default='aws-ray-mlops-workshop/feature-store')
    parser.add_argument('--num_actors', type=int, default=4)
    parser.add_argument('--region', type=str, default='us-east-1')
    parser.add_argument('--role_arn', type=str)
    params, _ = parser.parse_known_args()
    return params

start = time.time()      
print(f"===========================================================")
print(f"Starting Feature Store Ingestion")
print(f"Reading parameters")

ray.init(runtime_env={'env_vars': {'__MODIN_AUTOIMPORT_PANDAS__': '1'}})

# reading job parameters
args = read_parameters()
print(f"Parameters read: {args}")


create_feature_group(
            args.feature_group_name, 
            f'{args.bucket_prefix}/synthetic-housing-price-data',
            args.role_arn,
            args.region
        )


# set input path
input_data_path = "/opt/ml/processing/input/"

input_partitions = read_csv(input_data_path, args.num_actors)


# Start actors and assign partitions in a loop
actors = [Featurestore.remote() for _ in range(args.num_actors)]
results = []
for actor, partition in zip(actors, input_partitions):
    results.append(actor.ingest_features.remote(args.feature_group_name, partition, args.region))
    
#for actor_state in ray.util.state.list_actors():
#    print(actor_state.node_id, actor_state.pid, actor_state.name, actor_state.state)

ray.get(results)

taken = time.time() - start
print(f"Ending Feature Store Ingestion")
print(f"TOTAL TIME TAKEN: {taken:.2f} seconds")
print(f"===========================================================")

In [None]:
sklearn_processor = SKLearnProcessor(
    framework_version='1.0-1',
    role=role_arn,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count
)

In [None]:
time_str = '-' + time.strftime('%Y-%m-%d-%H-%M-%S')
feature_group_name = "fs-ray-synthetic_home-price"+time_str
bucket_prefix = f'{s3_prefix}/data/feature-store'

### Ray parallel processing
Note that we specify the num of actors as a parameter to the processing job. This determines the number of instances of the actor that will run the data procesing job. Also note the num_cpu parameter specified on top of the actor class. This paramter controls the amount of CPU resources allocated per actor instance. In this case it is 0.25 CPU. This will enable fractional CPU allocation in increments of 0.25 CPUs and help pack more actors per node. The ability to tune fractional CPU resources lets you maximize overall resource usage efficiency in a Ray cluster. The optimal allocation balances parallelism, throughput and schedulability across all tasks.

In [None]:
from IPython.core.display import display, HTML

# code=can be a s3 uri for the input script
job_name = f"processing-with-fs-{strftime('%Y-%m-%d-%H-%M-%S')}"

display(
    HTML(
        '<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/processing-jobs/{}">Feature Store Processing Job</a></b>'.format(
            region, job_name
        )
    )
)

sklearn_processor.run(
    code='pipeline_scripts/feature-store/script-fs.py',
    job_name=job_name,
    inputs=[
        ProcessingInput(
            source=raw_s3,
            destination='/opt/ml/processing/input',
            s3_data_distribution_type='ShardedByS3Key'
        )
    ],
    # notice that all arguments passed to a SageMaker processing job should be strings as they are transformed to command line parameters.
    # Your read_parameters function will handle the data types for your code 
    arguments=[
        "--feature_group_name", feature_group_name,
        "--num_actors", '10',
        "--bucket_prefix", bucket_prefix,
        "--role_arn", role_arn,
        "--region", region
    ]
)

preprocessing_job_description = sklearn_processor.jobs[-1].describe()

### Splitting and Scaling data

Now let's process our data for a Machine Learning model. 

We will create a script that will be will split the data into train, validation and test datasets using feature available with in the <mark>`ray.data`</mark> library. Then scale all columns other than the target column using a standard scaler from `ray.data.preprocessors`. 

Ray provides a `ray.data.Dataset` API for working with distributed datasets. This API allows you to perform parallel processing on large datasets by automatically partitioning the data into blocks and executing operations on these blocks in parallel.

When you create a `ray.data.Dataset` object, the data is automatically partitioned into blocks based on the size of the data and the available resources in your Ray cluster. Each block is stored as a separate Ray object, allowing multiple tasks to operate on different blocks in parallel.

When you perform operations on a `ray.data.Dataset`, such as applying a transformation or computing an aggregation, Ray automatically schedules tasks to execute these operations on the individual blocks in parallel. The results of these operations are then combined to produce the final result.

Besides preparing a Ray preprocessing script, we also need to prepare a requirements.txt with ray listed.

In [None]:
%%writefile ./pipeline_scripts/preprocessing/script.py
import subprocess
import sys
subprocess.check_call([sys.executable, '-m', 'pip', 'install', 'sagemaker','ray', 'pyarrow >= 6.0.1'])

import argparse
import os

import sagemaker
# Experiments
from sagemaker.session import Session
from sagemaker.feature_store.feature_group import FeatureGroup

import boto3
import ray
from ray.air.config import ScalingConfig
from ray.data import Dataset
from ray.data.preprocessors import StandardScaler

def read_parameters():
    """
    Read job parameters
    Returns:
        (Namespace): read parameters
    """
    parser = argparse.ArgumentParser()
    parser.add_argument('--feature_group_name', type=str, default='fs-synthetic-house-price')
    parser.add_argument('--train_size', type=float, default=0.6)
    parser.add_argument('--val_size', type=float, default=0.2)
    parser.add_argument('--test_size', type=float, default=0.2)
    parser.add_argument('--random_state', type=int, default=42)
    parser.add_argument('--target_col', type=str, default='PRICE')
    parser.add_argument('--region', type=str, default='us-east-1')
    params, _ = parser.parse_known_args()
    return params

def split_dataset(dataset, train_size, val_size, test_size, random_state=None):
    """
    Split dataset into train, validation and test samples
    Args:
        dataset (ray.data.Dataset): input data
        train_size (float): ratio of data to use as training dataset
        val_size (float): ratio of data to use as validation dataset
        test_size (float): ratio of data to use as test dataset
        random_state (int): Pass an int for reproducible output across multiple function calls.
    Returns:
        train_set (ray.data.Dataset): train dataset
        val_set (ray.data.Dataset): validation dataset
        test_set (ray.data.Dataset): test dataset
    """
    if (train_size + val_size + test_size) != 1.0:
        raise ValueError("train_size, val_size and test_size must sum up to 1.0")
    
    # Shuffle this dataset with a fixed random seed.
    shuffled_ds = dataset.random_shuffle(seed=random_state)
    # Split the data into train, validation and test datasets
    train_set, val_set, test_set = shuffled_ds.split_proportionately([train_size, val_size])
    
    # Sanity check
    # IMPORTANT!!! Do not include this for large datasets as this can be an expensive operation
    train_perc = int((train_set.count()/shuffled_ds.count()) * 100)
    print(f"Training size: {train_set.count()} - {train_perc}% of total")
    val_perc = int((val_set.count()/shuffled_ds.count()) * 100)
    print(f"Val size: {val_set.count()} - {val_perc}% of total")
    test_perc = int((test_set.count()/shuffled_ds.count()) * 100)
    print(f"Test size: {test_set.count()} - {test_perc}% of total")
    return train_set, val_set, test_set

def scale_dataset(train_set, val_set, test_set, target_col):
    """
    Fit StandardScaler to train_set and apply it to val_set and test_set
    Args:
        train_set (ray.data.Dataset): train dataset
        val_set (ray.data.Dataset): validation dataset
        test_set (ray.data.Dataset): test dataset
        target_col (str): target col
    Returns:
        train_transformed (ray.data.Dataset): train data scaled
        val_transformed (ray.data.Dataset): val data scaled
        test_transformed (ray.data.Dataset): test data scaled
    """
    
    tranform_cols = dataset.columns()
    # Remove the target columns from being scaled
    tranform_cols.remove(target_col)
    # set up a standard scaler
    standard_scaler = StandardScaler(tranform_cols)
    # fit scaler to training dataset
    print("Fitting scaling to training data and transforming dataset...")
    train_set_transformed = standard_scaler.fit_transform(train_set)
    # apply scaler to validation and test datasets
    print("Transforming validation and test datasets...")
    val_set_transformed = standard_scaler.transform(val_set)
    test_set_transformed = standard_scaler.transform(test_set)
    return train_set_transformed, val_set_transformed, test_set_transformed

def load_dataset(feature_group_name, region):
    """
    Loads the data as a ray dataset from the offline featurestore S3 location
    Args:
        feature_group_name (str): name of the feature group
    Returns:
        ds (ray.data.dataset): Ray dataset the contains the requested dat from the feature store
    """
    session = sagemaker.Session(boto3.Session(region_name=region))
    fs_group = FeatureGroup(
        name=feature_group_name, 
        sagemaker_session=session
    )

    fs_data_loc = fs_group.describe().get("OfflineStoreConfig").get("S3StorageConfig").get("ResolvedOutputS3Uri")
    
    # Drop columns added by the feature store
    # Since these are not related to the ML problem at hand
    cols_to_drop = ["record_id", "event_time","write_time", 
                    "api_invocation_time", "is_deleted", 
                    "year", "month", "day", "hour"]           

    ds = ray.data.read_parquet(fs_data_loc)
    ds = ds.drop_columns(cols_to_drop)
    print(f"{fs_data_loc} count is {ds.count()}")
    return ds

print(f"===========================================================")
print(f"Starting pre-processing")
print(f"Reading parameters")

# reading job parameters
args = read_parameters()
print(f"Parameters read: {args}")

# set output paths
train_data_path = "/opt/ml/processing/output/train"
val_data_path = "/opt/ml/processing/output/validation"
test_data_path = "/opt/ml/processing/output/test"

try:
    os.makedirs(train_data_path)
    os.makedirs(val_data_path)
    os.makedirs(test_data_path)
except:
    pass

# read data input
dataset = load_dataset(args.feature_group_name, args.region)

# split dataset into train, validation and test
train_set, val_set, test_set = split_dataset(
    dataset,
    train_size=args.train_size,
    val_size=args.val_size,
    test_size=args.test_size,
    random_state=args.random_state
)


# scale datasets
train_transformed, val_transformed, test_transformed = scale_dataset(
    train_set, 
    val_set, 
    test_set,
    args.target_col
)

print("Saving data")
train_transformed.write_csv(train_data_path)
val_transformed.write_csv(val_data_path)
test_transformed.write_csv(test_data_path)
print(f"Ending pre-processing")
print(f"===========================================================")

In [None]:
sklearn_processor = SKLearnProcessor(
    framework_version='1.0-1',
    role=role_arn,
    instance_type=processing_instance_type,
    instance_count=processing_instance_count
)

In [None]:
# Optional step
# Uncomment the delete line below to delete all file in the S3 prefix before begining preprocessing. 
# This is to prevent duplication of data when running this workshop multiple time.

s3 = boto3.resource('s3')
print(bucket)
bucket_obj = s3.Bucket(bucket)
print(f"{s3_prefix}/data/sm_processed/")
files = bucket_obj.objects.filter(Prefix=f"{s3_prefix}/data/sm_processed/")

#Uncomment the below line 
files.delete()

We're now ready to run the Processing job for spliting and scaling the data.

In [None]:
train_s3_destination = f'{output_path}/train'
val_s3_destination = f'{output_path}/validation'
test_s3_destination = f'{output_path}/test'

Let's download the test set to a local directory. We will be using this in notebook 3 to test our deployment

In [None]:
test_s3_destination

In [None]:
from IPython.core.display import display, HTML
# code=can be a s3 uri for the input script
job_name = f"processing-{strftime('%Y-%m-%d-%H-%M-%S')}"

display(
    HTML(
        '<b>Review <a target="blank" href="https://console.aws.amazon.com/sagemaker/home?region={}#/processing-jobs/{}">Processing Job</a></b>'.format(
            region, job_name
        )
    )
)
sklearn_processor.run(
    code='pipeline_scripts/preprocessing/script.py',
    job_name=job_name,
    outputs=[
        ProcessingOutput(
            output_name='train',
            destination=train_s3_destination,
            source='/opt/ml/processing/output/train'
        ),          
        ProcessingOutput(
            output_name='validation',
            destination=val_s3_destination,
            source='/opt/ml/processing/output/validation'
        ),
        ProcessingOutput(
            output_name='test',
            destination=test_s3_destination,
            source='/opt/ml/processing/output/test'
        )
    ],
    # notice that all arguments passed to a SageMaker processing job should be strings as they are transformed to command line parameters.
    # Your read_parameters function will handle the data types for your code 
    arguments=[
        "--feature_group_name", feature_group_name,
        "--train_size", str(train_size),
        "--val_size", str(val_size),
        "--test_size", str(test_size),
        "--random_state", str(random_seed),
        "--region", region
    ]
)
preprocessing_job_description = sklearn_processor.jobs[-1].describe()

In [None]:
from sagemaker.s3 import S3Downloader

S3Downloader.download(s3_uri=test_s3_destination, local_path=processed_test_dir)

### Sanity Check

In [None]:
print(f'{output_path}/train')
train_ds = ray.data.read_csv(f'{output_path}/train')
train_ds.count()

In [None]:
%store feature_group_name
%store s3_prefix
%store output_path
%store train_s3_destination
%store val_s3_destination
%store test_s3_destination
%store raw_s3
