{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# EMR Data Prep + SageMaker Deep Learning\n", "\n", "This notebook is tested using `Studio SparkMagic - PySpark Kernel` running on a `ml.t3.medium` instance and connected to an EMR clsuter with an `m5.xlarge` Master node and 2 `m5.xlarge` Core nodes. Please ensure that you see `PySpark (SparkMagic)` in the top right on your notebook.\n", "\n", "In this 3 part notebook lesson, we'll see how to use EMR for data prep and serialization to S3. Next we'll prototype a deep learning architecture using SageMaker Studio notebooks, and lastly we'll scale the training using SageMaker ephemeral training jobs." ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "### Connection to EMR Cluster\n", "\n", "In the cell below, the code block is autogenerated. You can generate this code by clicking on the \"Cluster\" link on the top of the notebook and select the EMR cluster. \n", "\n", "For our workshop we be passing our SageMaker execution role to the cluster, but this works equally well for Kerberos, LDAP and HTTP auth mechanisms\n", "![img](https://user-images.githubusercontent.com/18154355/216500654-a18ac11a-c405-4704-b9f6-c6cd4f4fb324.png)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Overview\n", "\n", "In this notebook, we'll use a remote EMR cluster to prepare our dataset for regression model building\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Inspect the public NYC Taxi Dataset\n", "\n", "In this lab, we'll be using the Registry of Open Data on AWS to access the New York City Taxi and Limousine Commission (TLC) Trip Record Data:\n", "[https://registry.opendata.aws/nyc-tlc-trip-records-pds/](https://registry.opendata.aws/nyc-tlc-trip-records-pds/)\n", "\n", "[https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page)\n", "\n", "Ultimately, the goal will be able to use the available data to predict what the cost of a trip will be. We're planning to solve this regression problem using a deep neural network.\n" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local\n", "!aws s3 ls \"s3://nyc-tlc/trip data/green\" --human-readable | grep green_tripdata_2020" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "We can see that the dataset is quite large, which makes this a perfect usecase for using a distributed processing framework like Apache Spark to prep our dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df = spark.read.parquet(\"s3://nyc-tlc/trip data/green_tripdata_2020*.parquet\").cache()\n", "df.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Format the dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%pretty\n", "from pyspark.sql.functions import col, dayofweek, month, hour\n", "df_dt = df.select(dayofweek(col('lpep_pickup_datetime')).alias('day_of_week'),\n", " month(col('lpep_pickup_datetime')).alias('month'),\n", " hour(col('lpep_pickup_datetime')).alias('hour'),\n", " col(\"PULocationID\").alias(\"pickup_location_id\"),\n", " col(\"DOLocationID\").alias(\"dropoff_location_id\"),\n", " col(\"Trip_distance\").alias(\"trip_distance\"),\n", " col(\"Fare_amount\").alias(\"fare_amount\")\n", " )\n", "df_dt.show()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Filter the dataset" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "df_dt = df_dt[\n", " (df_dt.fare_amount > 0)\n", " & (df_dt.fare_amount < 200) \n", "]\n", "df_dt.count()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Use PySpark to create train, test, validation splits of our formatted and filtered data" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_df, val_df = df_dt.randomSplit([0.8, 0.2], seed=42)\n", "val_df, test_df = val_df.randomSplit([0.05, 0.95], seed=42)\n", "\n", "print(\"Train Count:\", train_df.count())\n", "print(\"Validation Count:\", val_df.count())\n", "print(\"Test Count:\", test_df.count())" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%local \n", "import sagemaker\n", "\n", "sess = sagemaker.Session()\n", "role = sagemaker.get_execution_role()\n", "bucket = sess.default_bucket()\n", "\n", "data_bucket = f\"{bucket}/nyc-taxi/data/processed\"\n", "print(data_bucket)" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%send_to_spark -i data_bucket -t str -n data_bucket" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Write the processed dataset to our S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "train_df.write.csv(f\"s3://{data_bucket}/train\", mode='overwrite')\n", "test_df.write.csv(f\"s3://{data_bucket}/test\", mode='overwrite')" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Store data location for next notebook" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%store data_bucket" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "%%cleanup -f" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "PySpark (SparkMagic)", "language": "python", "name": "pysparkkernel__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/sagemaker-sparkmagic" }, "language_info": { "codemirror_mode": { "name": "python", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "pyspark", "pygments_lexer": "python3" } }, "nbformat": 4, "nbformat_minor": 4 }