# EMR Data Prep + SageMaker Deep Learning

This notebook is tested using `Studio SparkMagic - PySpark Kernel` running on a `ml.t3.medium` instance and connected to an EMR clsuter with an `m5.xlarge` Master node and 2 `m5.xlarge` Core nodes. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.

In this 3 part notebook lesson, we'll see how to use EMR for data prep and serialization to S3. Next we'll prototype a deep learning architecture using SageMaker Studio notebooks, and lastly we'll scale the training using SageMaker ephemeral training jobs.

### Connection to EMR Cluster

In the cell below, the code block is autogenerated. You can generate this code by clicking on the "Cluster" link on the top of the notebook and select the EMR cluster. 

For our workshop we be passing our SageMaker execution role to the cluster, but this works equally well for Kerberos, LDAP and HTTP auth mechanisms
![img](https://user-images.githubusercontent.com/18154355/216500654-a18ac11a-c405-4704-b9f6-c6cd4f4fb324.png)

## Overview

In this notebook, we'll use a remote EMR cluster to prepare our dataset for regression model building


## Inspect the public NYC Taxi Dataset

In this lab, we'll be using the Registry of Open Data on AWS to access the New York City Taxi and Limousine Commission (TLC) Trip Record Data:
[https://registry.opendata.aws/nyc-tlc-trip-records-pds/](https://registry.opendata.aws/nyc-tlc-trip-records-pds/)

[https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)

Ultimately, the goal will be able to use the available data to predict what the cost of a trip will be. We're planning to solve this regression problem using a deep neural network.


In [None]:
%%local
!aws s3 ls "s3://nyc-tlc/trip data/green" --human-readable | grep green_tripdata_2020

We can see that the dataset is quite large, which makes this a perfect usecase for using a distributed processing framework like Apache Spark to prep our dataset

In [None]:
df = spark.read.parquet("s3://nyc-tlc/trip data/green_tripdata_2020*.parquet").cache()
df.count()

## Format the dataset

In [None]:
%%pretty
from pyspark.sql.functions import col, dayofweek, month, hour
df_dt = df.select(dayofweek(col('lpep_pickup_datetime')).alias('day_of_week'),
 month(col('lpep_pickup_datetime')).alias('month'),
 hour(col('lpep_pickup_datetime')).alias('hour'),
 col("PULocationID").alias("pickup_location_id"),
 col("DOLocationID").alias("dropoff_location_id"),
 col("Trip_distance").alias("trip_distance"),
 col("Fare_amount").alias("fare_amount")
 )
df_dt.show()

## Filter the dataset

In [None]:
df_dt = df_dt[
 (df_dt.fare_amount > 0)
 & (df_dt.fare_amount < 200) 
]
df_dt.count()

## Use PySpark to create train, test, validation splits of our formatted and filtered data

In [None]:
train_df, val_df = df_dt.randomSplit([0.8, 0.2], seed=42)
val_df, test_df = val_df.randomSplit([0.05, 0.95], seed=42)

print("Train Count:", train_df.count())
print("Validation Count:", val_df.count())
print("Test Count:", test_df.count())

In [None]:
%%local 
import sagemaker

sess = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sess.default_bucket()

data_bucket = f"{bucket}/nyc-taxi/data/processed"
print(data_bucket)

In [None]:
%%send_to_spark -i data_bucket -t str -n data_bucket

## Write the processed dataset to our S3 bucket

In [None]:
train_df.write.csv(f"s3://{data_bucket}/train", mode='overwrite')
test_df.write.csv(f"s3://{data_bucket}/test", mode='overwrite')

## Store data location for next notebook

In [None]:
%store data_bucket

In [None]:
%%cleanup -f