{ "cells": [ { "cell_type": "markdown", "id": "d6be2d68", "metadata": {}, "source": [ "# Flight Delay Prediction with Distributed Deep Learning\n", "In this notebook we'll use a flight dataset from [here](https://kt.ijs.si/elena_ikonomovska/data.html). The data containsflight arrival and departure details for all the commercial flights within the USA, from October 1987 to April 2008. Its has about 117 million records and is about 6Gb in size" ] }, { "cell_type": "markdown", "id": "46ca0217", "metadata": {}, "source": [ "Rather that a traditional MLP architecture, we'll instead train a TabNet model. TabNet is a deep neural network model for Tabular Data proposed by Google [link](https://arxiv.org/abs/1908.07442).It utilizes transformer blocks and attention mechanism that have had significant success in the NLP and Computer Vision Domains. We'll also utilize [Ray](https://docs.ray.io/en/latest/index.html) for distributed data loading and preprocessing\n", "\n", "![tabnet](image/tabnet.png)" ] }, { "cell_type": "markdown", "id": "67f93081", "metadata": {}, "source": [ "## Advantages of Tabnet\n", "\n", "- Train multi-objective models (e.g. multi-label, multi-regressor)\n", "- Uses attention mechnism for feature selection. Which can also be used to provide local explanations at inference time\n", "- It uses attention for selecting out the set of features to focus on for a given particular data point and we can even visualize that to see which parts get attention for a particular decision . We can also play with the number of features we want the Tabnet to focus to.\n", "- Like other DL models, can easily utilize custom loss functions\n", "- Can be pre-trained and finetuned similar to language and vision transformer models\n", "- Can be trained on large number of features as it performs automatic feature selection which reduces a the need to do this during preprocessing\n", "- Can be trained on large datasets without having to load the entire data into memory \n", "- Potentially requires less feature engineering" ] }, { "cell_type": "markdown", "id": "e1e84b70", "metadata": {}, "source": [ "## Download Data\n", "Download data from a public S3 bucket" ] }, { "cell_type": "code", "execution_count": null, "id": "56cd3a33", "metadata": {}, "outputs": [], "source": [ "!wget http://ee-assets-prod-us-east-1.s3.amazonaws.com/modules/05fa7598d4d44836a42fde79b26568b2/v2/airline_14col.data.bz2 -P data/" ] }, { "cell_type": "markdown", "id": "6ebc8e81", "metadata": {}, "source": [ "## Prepare Data\n", "- divide the single CSV file into multiple parquet files\n", "- split into train and test datasets\n", "- capture categorical feature data for future encoding purposes" ] }, { "cell_type": "code", "execution_count": null, "id": "b62d40a9", "metadata": {}, "outputs": [], "source": [ "import pandas as pd\n", "import numpy as np\n", "import json\n", "from pathlib import Path\n", "from tqdm.notebook import tqdm" ] }, { "cell_type": "code", "execution_count": null, "id": "c23c3a66", "metadata": {}, "outputs": [], "source": [ "cwd = Path.cwd()\n", "data_path = cwd / \"data\"\n", "train_data_path = data_path / \"train\"\n", "test_data_path = data_path / \"test\"\n", "\n", "train_data_path.mkdir()\n", "test_data_path.mkdir()" ] }, { "cell_type": "code", "execution_count": null, "id": "15eb6328", "metadata": {}, "outputs": [], "source": [ "cols = [\n", " \"Year\", \"Month\", \"DayofMonth\", \"DayofWeek\", \"CRSDepTime\",\n", " \"CRSArrTime\", \"UniqueCarrier\", \"FlightNum\", \"ActualElapsedTime\",\n", " \"Origin\", \"Dest\", \"Distance\", \"Diverted\", \"ArrDelay\"\n", " ]" ] }, { "cell_type": "code", "execution_count": null, "id": "b2d1fba9", "metadata": {}, "outputs": [], "source": [ "df_sample = pd.read_csv(data_path / \"airline_14col.data.bz2\", nrows=100, names=cols)\n", "df_sample.head()" ] }, { "cell_type": "code", "execution_count": null, "id": "2d25b719", "metadata": {}, "outputs": [], "source": [ "cat_cols = [\"Month\", \"DayofMonth\", \"DayofWeek\", \"UniqueCarrier\", \"Origin\", \"Dest\", \"Diverted\"]\n", "cat_unique_values = {cat_col: set() for cat_col in cat_cols} # capture unique values for each categorical column" ] }, { "cell_type": "code", "execution_count": null, "id": "314b50fe", "metadata": {}, "outputs": [], "source": [ "chunks = pd.read_csv(\"data/airline_14col.data.bz2\", chunksize=2_000_000, names=cols)" ] }, { "cell_type": "code", "execution_count": null, "id": "4fe2191c", "metadata": {}, "outputs": [], "source": [ "# Converting the data into parquet chunks\n", "for n,chunk in tqdm(enumerate(chunks), desc=\"Converting to parquet\", total=58) :\n", " for col in cat_cols:\n", " cat_unique_values[col].update(set(chunk[col].unique()))\n", " if chunk.iloc[0][\"Year\"] < 2004:\n", " chunk.to_parquet(train_data_path / f\"{n}.snappy.parquet\", index=False)\n", " else:\n", " chunk.to_parquet(test_data_path / f\"{n}.snappy.parquet\", index=False)" ] }, { "cell_type": "code", "execution_count": null, "id": "5ec6d91d", "metadata": {}, "outputs": [], "source": [ "!rm data/airline_14col.data.bz2" ] }, { "cell_type": "code", "execution_count": null, "id": "df28eb25", "metadata": {}, "outputs": [], "source": [ "for col in [\"Month\", \"DayofMonth\", \"DayofWeek\", \"Diverted\"]:\n", " cat_unique_values[col] = set(map(int, cat_unique_values[col]))\n", "\n", "cat_num_unique = {k: len(v) for k,v in cat_unique_values.items()}\n", "cat_encoders = {k: dict(zip(cat_unique_values[k], range(len(cat_unique_values[k])))) for k in cat_unique_values}\n", "cat_embed_size = {k: int(np.log1p(len(v)))+1 for k,v in cat_unique_values.items()}" ] }, { "cell_type": "code", "execution_count": null, "id": "a65856c0", "metadata": {}, "outputs": [], "source": [ "cat_col_meta_path = cwd / \"col_meta\"\n", "cat_col_meta_path.mkdir(exist_ok=True)\n", "\n", "(cat_col_meta_path / \"encoders.json\").open(\"w\").write(json.dumps(cat_encoders))\n", "(cat_col_meta_path / \"embed_size.json\").open(\"w\").write(json.dumps(cat_embed_size))\n", "(cat_col_meta_path / \"num_unique.json\").open(\"w\").write(json.dumps(cat_num_unique))" ] }, { "cell_type": "markdown", "id": "e21bfc4b", "metadata": {}, "source": [ "## Train model with PyTorch and TensorFlow" ] }, { "cell_type": "code", "execution_count": null, "id": "60b21d18", "metadata": {}, "outputs": [], "source": [ "import boto3\n", "import uuid\n", "import shutil\n", "import sagemaker\n", "from sagemaker.pytorch import PyTorch\n", "from sagemaker.tensorflow import TensorFlow\n", "\n", "sess = sagemaker.session.Session()\n", "role = sagemaker.get_execution_role()\n", "bucket = sess.default_bucket()\n", "key_prefix = f\"{uuid.getnode()}/distributed_tabnet\"" ] }, { "cell_type": "code", "execution_count": null, "id": "9d93c4c5", "metadata": {}, "outputs": [], "source": [ "# upload data to s3\n", "s3_train_data_path = sess.upload_data(\"data/train/\", bucket=bucket, key_prefix=f\"{key_prefix}/train\")\n", "s3_test_data_path = sess.upload_data(\"data/test/\", bucket=bucket, key_prefix=f\"{key_prefix}/test\")\n", "s3_meta_data_path = sess.upload_data(\"col_meta/\", bucket=bucket, key_prefix=f\"{key_prefix}/meta\")" ] }, { "cell_type": "code", "execution_count": null, "id": "2507cb35", "metadata": {}, "outputs": [], "source": [ "# optionally set subnets and security_groups\n", "subnets=None\n", "security_group_ids=None" ] }, { "cell_type": "code", "execution_count": null, "id": "0559216c", "metadata": {}, "outputs": [], "source": [ "# Training with TensorFlow\n", "tb_logging_path = f\"s3://{bucket}/{key_prefix}/tb_logs/tf\"\n", "shutil.copyfile(\"src/requirements_tf.txt\", \"src/requirements.txt\")\n", "tf_estimator = TensorFlow(\n", " source_dir = \"src\",\n", " entry_point=\"train_airlines_tf.py\",\n", " subnets=subnets,\n", " security_group_ids=security_group_ids,\n", " role=role,\n", " instance_count=1,\n", " hyperparameters={\"s3_train_data\":s3_train_data_path, \n", " \"s3_test_data\":s3_test_data_path, \n", " \"s3_schema_file\":f\"{s3_train_data_path}/0.snappy.parquet\",\n", " \"epochs\":2, \"batch_size\": 50_000, \"lr\": 2e-2},\n", " instance_type=\"ml.g4dn.12xlarge\", # try with ml.g5.12xlarge if limit exception raised\n", " framework_version=\"2.8\",\n", " py_version=\"py39\",\n", " checkpoint_s3_uri=tb_logging_path,\n", " keep_alive_period_in_seconds=1800\n", ")\n", "\n", "tf_estimator.fit({\"meta\": s3_meta_data_path}, wait=False) # change wait=True if you wnat to see the logs" ] }, { "cell_type": "markdown", "id": "4c7a0c50-45b1-4cd5-951a-3a7dc4154cbe", "metadata": {}, "source": [ "### Analyze the model with TensorBoard\n", "**Note: You have to wait a few minutes for the job to launch before seeing any logs**\n", "\n", "We can use [TensorBoard](https://www.tensorflow.org/tensorboard), a visualization toolkit for analyzing deep learning models. Instructions for using TensorBoard with SageMaker Studio can be found [here](https://docs.aws.amazon.com/sagemaker/latest/dg/studio-tensorboard.html). Instructions for accessing TensorBoard in SageMaker Studio are provided below:\n", "1. Open a new terminal in SageMaker Studio by navigating to File->New->Terminal
![](./image/open_terminal.JPG)\n", "2. Run the notebook cell below to generate a terminal command\n", "3. Copy the command and paste it into the terminal and hit Enter\n", "4. Return to the notebook an click the link provided in the bellow cell" ] }, { "cell_type": "code", "execution_count": null, "id": "ebb39a50", "metadata": {}, "outputs": [], "source": [ "from IPython.display import HTML\n", "import os\n", "cur_dir = os.getcwd().replace(os.environ[\"HOME\"],\"\")\n", "HTML(f'''1. Paste the following command into the Studio Terminal tensorboard --logdir {tb_logging_path}\n", "
\n", "(You may need to run pip install tensorboard and pip install tensorflow_io if TesorBoard is not already installed)\n", "
\n", "2. Click here to open TensorBoard''')" ] }, { "cell_type": "code", "execution_count": null, "id": "d0936336", "metadata": {}, "outputs": [], "source": [ "# Training with PyTorch\n", "tb_logging_path = f\"s3://{bucket}/{key_prefix}/tb_logs/pt\"\n", "shutil.copyfile(\"src/requirements_pt.txt\", \"src/requirements.txt\")\n", "pt_estimator = PyTorch(\n", " source_dir = \"src\",\n", " entry_point=\"train_airlines_pt.py\",\n", " subnets=subnets,\n", " security_group_ids=security_group_ids,\n", " role=role,\n", " instance_count=1, \n", " instance_type=\"ml.g4dn.12xlarge\", # try with ml.g5.12xlarge if limit exception raised\n", " framework_version=\"1.10\",\n", " py_version=\"py38\",\n", " hyperparameters={\"s3_train_data\":s3_train_data_path, \n", " \"s3_test_data\":s3_test_data_path, \n", " \"s3_schema_file\":f\"{s3_train_data_path}/0.snappy.parquet\",\n", " \"tb_logging_path\":tb_logging_path,\n", " \"epochs\":3, \"batch_size\": 50_000, \"lr\": 2e-2},\n", " keep_alive_period_in_seconds=1800\n", ")\n", "\n", "pt_estimator.fit({\"meta\": s3_meta_data_path}, wait=False) # change wait=True if you wnat to see the logs" ] }, { "cell_type": "code", "execution_count": null, "id": "c623ae48-2936-472d-ba87-01668b358f04", "metadata": {}, "outputs": [], "source": [ "cur_dir = os.getcwd().replace(os.environ[\"HOME\"],\"\")\n", "HTML(f'''1. Paste the following command into the Studio Terminal tensorboard --logdir {tb_logging_path}\n", "
\n", "(You may need to run pip install tensorboard and pip install tensorflow_io if TesorBoard is not already installed)\n", "
\n", "2. Click here to open TensorBoard''')" ] }, { "cell_type": "markdown", "id": "e7c84173", "metadata": {}, "source": [ "## Cleanup" ] }, { "cell_type": "code", "execution_count": null, "id": "73706066", "metadata": {}, "outputs": [], "source": [ "!rm -rf data col_meta/" ] }, { "cell_type": "code", "execution_count": null, "id": "c9adecbc", "metadata": {}, "outputs": [], "source": [ "!aws s3 rm --recursive s3://{bucket}/{key_prefix}" ] }, { "cell_type": "code", "execution_count": null, "id": "bacfbb20", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "conda_amazonei_pytorch_latest_p37", "language": "python", "name": "conda_amazonei_pytorch_latest_p37" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.12" } }, "nbformat": 4, "nbformat_minor": 5 }