# Music Recommender Lab 1 : Data Preparation and Process

### Loading stored variables
If you ran this notebook before, you may want to re-use the resources you aready created with AWS. Run the cell below to load any prevously created variables. You should see a print-out of the existing variables. If you don't see anything printed then it's probably the first time you are running the notebook!

* Please make sure to run the 00_overview_arch_data.ipynb in the notebook folder before running this notebook

In [None]:
%store -r
%store

## Preparation


Let's start by specifying:

- The S3 bucket and prefix that you want to use for training and model data.  This should be within the same region as the Notebook Instance, training, and hosting.
- The IAM role arn used to give training and hosting access to your data. See the documentation for how to create these.  Note, if more than one role is required for notebook instances, training, and/or hosting, please replace the boto regexp with a the appropriate full IAM role arn string(s).

In [None]:
import sagemaker
import boto3
import re
import os
from sagemaker import get_execution_role

from sagemaker.processing import FrameworkProcessor
from sagemaker.processing import ProcessingInput, ProcessingOutput
from sagemaker.utils import name_from_base
from sagemaker.sklearn.estimator import SKLearn



In [None]:
import numpy as np                                # For matrix operations and numerical processing
import pandas as pd                               # For munging tabular data
import matplotlib.pyplot as plt                   # For charts and visualizations
from IPython.display import Image                 # For displaying images in the notebook
from IPython.display import display               # For displaying outputs in the notebook
from time import gmtime, strftime                 # For labeling SageMaker models, endpoints, etc.
import sys                                        # For writing outputs to notebook

bucket=sagemaker.Session().default_bucket()
prefix = 'music-recommendation-workshop'

role = get_execution_role()
sagemaker_session = sagemaker.Session()

In [None]:
if 'new_data_paths' not in locals():
    
    new_data_paths = [f's3://{bucket}/{prefix}/input/tracks.csv',
                     f's3://{bucket}/{prefix}/input/ratings.csv']
    %store new_data_paths
    
else:
    print(f'input source is available: {new_data_paths}')

%store new_data_paths

# Feature Engineering with Amazon SageMaker Processing

Amazon SageMaker Processing allows you to run steps for data pre- or post-processing, feature engineering, data validation, or model evaluation workloads on Amazon SageMaker. Processing jobs accept data from Amazon S3 as input and store data into Amazon S3 as output.

![processing](https://sagemaker.readthedocs.io/en/stable/_images/amazon_sagemaker_processing_image1.png)

Here, we'll import the dataset and transform it with SageMaker Processing, which can be used to process terabytes of data in a SageMaker-managed cluster separate from the instance running your notebook server. In a typical SageMaker workflow, notebooks are only used for prototyping and can be run on relatively inexpensive and less powerful instances, while processing, training and model hosting tasks are run on separate, more powerful SageMaker-managed instances.  SageMaker Processing includes off-the-shelf support for Scikit-learn, as well as a Bring Your Own Container option, so it can be used with many different data transformation technologies and tasks.    

To use SageMaker Processing, simply supply a Python data preprocessing script as shown below.  For this example, we're using a SageMaker prebuilt Scikit-learn container, which includes many common functions for processing data.  There are few limitations on what kinds of code and operations you can run, and only a minimal contract:  input and output data must be placed in specified directories.  If this is done, SageMaker Processing automatically loads the input data from S3 and uploads transformed data back to S3 when the job is complete.

### Create Sklearn SageMaker Processing script

**Task 1**:
Fill in the missing part of the create_dataset script to perform feature engineering and preparation of the train and test dataset.

In [16]:
%%writefile ./code/create_datasets.py

import argparse
import pandas as pd
import pathlib
from sklearn.model_selection import train_test_split
import sys
import logging
import os
import glob

import sagemaker


logger = logging.getLogger()
logger.setLevel(logging.INFO)
logger.addHandler(logging.StreamHandler())


# Parse argument variables passed via the CreateDataset processing step
def parse_args() -> None:
    parser = argparse.ArgumentParser()
    parser.add_argument('--base_dir', type=str, default="/opt/ml/processing")
    args, _ = parser.parse_known_args()
    return args


def enrich_data(df_tracks: pd.DataFrame, df_ratings: pd.DataFrame):
    #----------------------------------------------------------
    # TODO - feature engineering
    # Please fill in this section of code by referring to the reference_notebook.ipynb notebook
    #----------------------------------------------------------
    return df_output

def load_data(file_list: list):
    # Define columns to use
    use_cols = []
    # Concat input files
    dfs = []
    for file in file_list:
        if len(use_cols)==0:
            dfs.append(pd.read_csv(file))
        else:
            dfs.append(pd.read_csv(file, usecols=use_cols))    
    return pd.concat(dfs, ignore_index=True)

def save_files(base_dir: str, df_processed: pd.DataFrame):
    
    # split data 
    #----------------------------------------------------------
    # TODO - split train, val and test data
    # Please fill in this section of code by referring to the reference_notebook.ipynb notebook
    #----------------------------------------------------------
    logger.info("Training dataset shape: {}\nValidation dataset shape: {}\nTest dataset shape: {}\n".format(train.shape, val.shape, test.shape))

    # Write train, test splits to output path
    train_output_path = pathlib.Path(f'{base_dir}/output/train')
    val_output_path = pathlib.Path(f'{base_dir}/output/val')
    test_output_path = pathlib.Path(f'{base_dir}/output/test')
    train.to_csv(train_output_path / 'train.csv', header=False, index=False)
    val.to_csv(val_output_path / 'validation.csv', header=False, index=False)
    test.to_csv(test_output_path / 'test.csv', header=False, index=False)

    logger.info('Training, validation, and Testing Sets Created')
    
    return


def main(base_dir: str, args: argparse.Namespace):
    # Input tracks files
    input_dir = os.path.join(base_dir, "input/tracks")
    track_file_list = glob.glob(f"{input_dir}/*.csv")
    logger.info(f"Input file list: {track_file_list}")
             
    if len(track_file_list) == 0:
        raise Exception(f"No input files found in {input_dir}")

    # Input ratings file
    ratings_dir = os.path.join(base_dir, "input/ratings")
    ratings_file_list = glob.glob(f"{ratings_dir}/*.csv")
    logger.info(f"Input file list: {ratings_file_list}")
    if not os.path.exists(ratings_dir):
        raise Exception(f"ratings file does not exist")

    # load data into dataframes
    df_tracks = load_data(track_file_list)
    df_ratings = load_data(ratings_file_list)
    
    # Extract and load taxi zones geopandas dataframe
    df_processed = enrich_data(df_tracks, df_ratings)
    
    return save_files(base_dir, df_processed)

if __name__ == "__main__":
    logger.info("Starting preprocessing.")
    args = parse_args()
    base_dir = args.base_dir
    main(base_dir, args)
    logger.info("Done")



Overwriting ./code/create_datasets.py


In [None]:
process_script = os.getcwd() + '/code/create_datasets.py'
%store process_script

Before starting the SageMaker Processing job, we instantiate a `SKLearnProcessor` object.  This object allows you to specify the instance type to use in the job, as well as how many instances.

In [None]:
est_cls = SKLearn
framework_version_str="0.20.0"

base_job_name = 'sm-music-processing'
sklearn_processor = FrameworkProcessor(
    estimator_cls=est_cls,
    framework_version=framework_version_str,
    role=get_execution_role(),
    instance_type="ml.m5.xlarge",
    instance_count=1, 
    base_job_name=base_job_name,
)
processing_job_name = name_from_base(base_job_name)

In [None]:
train_path = f"s3://{bucket}/{prefix}/train/{processing_job_name}"
val_path = f"s3://{bucket}/{prefix}/val/{processing_job_name}"
test_path = f"s3://{bucket}/{prefix}/test/{processing_job_name}"

In [None]:
sklearn_processor.run(
    code='create_datasets.py',
    source_dir='code',
    arguments = [
                 '--base_dir', '/opt/ml/processing',
                ],
    inputs=[
        ProcessingInput(
            source=[v for v in new_data_paths if 'tracks' in v][0],
            destination="/opt/ml/processing/input/tracks",
            s3_data_distribution_type="FullyReplicated",
        ),
        ProcessingInput(
            source=[v for v in new_data_paths if 'ratings' in v][0],
            destination="/opt/ml/processing/input/ratings",
            s3_data_distribution_type="FullyReplicated",
        ),
    ],
    outputs=[
        ProcessingOutput(output_name="train", source="/opt/ml/processing/output/train", destination=train_path),
        ProcessingOutput(output_name="validation", source="/opt/ml/processing/output/val", destination=val_path),
        ProcessingOutput(output_name="test", source="/opt/ml/processing/output/test", destination=test_path),
    ],
    job_name=processing_job_name,

)

In [None]:
%store train_path
%store val_path
%store test_path

---

## End of Lab 1