{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# TensorFlow Training at Scale\n", "\n", "This notebook is tested using `Data Science - Python 3 Kernel` running on a `ml.t3.medium` instance. Please ensure that you use `Python 3 (Data Science)` in the top right on your notebook.\n", "\n", "![img](https://user-images.githubusercontent.com/18154355/216501387-0a2fc1f9-205e-4466-b8f4-120e2e71b452.png)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Overview\n", "\n", "In this notebook, we'll use a Studio notebook to launch ephermal SageMaker Training jobs on our full dataset.\n" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Loading stored variables\n", "If you ran this notebook before, you may want to re-use the resources you aready created with AWS. Run the cell below to load any prevously created variables. You should see a print-out of the existing variables. If you don't see anything printed then it's probably the first time you are running the notebook!" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%store -r\n", "%store" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "# Ensure updated SageMaker SDK version\n", "%pip install -U -q sagemaker" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "# Write our training script to disk\n", "We'll be using SageMaker Training with it's prebuilt TensorFlow container optimized for AWS hardware. As such, we're going to create a training script which will be automatically packaged and shipped with our container thanks to the SageMaker SDK." ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "!mkdir -p src" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%writefile src/tf_train.py\n", "\n", "import os\n", "import argparse\n", "\n", "import boto3\n", "import tensorflow as tf\n", "from tensorflow.keras.experimental import LinearModel, WideDeepModel\n", "from tensorflow import keras\n", "from sagemaker.experiments import load_run\n", "from sagemaker.session import Session\n", "\n", "\n", "\n", "class SageMakerExperimentCallback(keras.callbacks.Callback):\n", " def __init__(self, run):\n", " super().__init__()\n", " self.run = run\n", " \n", " def on_epoch_end(self, epoch, logs=None):\n", " self.run.log_metric(name=\"loss\", value=logs[\"loss\"], step=epoch)\n", " self.run.log_metric(name=\"mse\", value=logs[\"mse\"], step=epoch)\n", "\n", "\n", "def parse_args():\n", "\n", " parser = argparse.ArgumentParser()\n", "\n", " # hyperparameters sent by the client are passed as command-line arguments to the script\n", " parser.add_argument(\"--epochs\", type=int, default=1)\n", " parser.add_argument(\"--batch_size\", type=int, default=64)\n", " parser.add_argument(\"--learning_rate\", type=float, default=0.1)\n", "\n", " # data directories\n", " parser.add_argument(\"--training\", type=str, default=os.environ[\"SM_CHANNEL_TRAINING\"])\n", " parser.add_argument(\"--testing\", type=str, default=os.environ[\"SM_CHANNEL_TESTING\"])\n", "\n", " # model directory: we will use the default set by SageMaker, /opt/ml/model\n", " parser.add_argument(\"--model_dir\", type=str, default=os.environ.get(\"SM_MODEL_DIR\"))\n", " parser.add_argument(\"--sagemaker_region\", type=str, default='us-east-1')\n", "\n", "\n", " return parser.parse_known_args()\n", "\n", "\n", "def get_train_data(train_dir, batch_size):\n", "\n", " def pack(features, label):\n", " linear_features = [tf.cast(features['day_of_week'], tf.float32), tf.cast(features['month'], tf.float32),\n", " tf.cast(features['hour'], tf.float32), features[\"trip_distance\"]]\n", " \n", " dnn_features = [tf.cast(features[\"pickup_location_id\"], tf.float32), tf.cast(features[\"dropoff_location_id\"], tf.float32), features[\"trip_distance\"]]\n", " return (tf.stack(linear_features, axis=-1), tf.stack(dnn_features, axis=-1)), label\n", "\n", " \n", " column_headers = [\"day_of_week\",\"month\",\"hour\",\"pickup_location_id\",\"dropoff_location_id\",\"trip_distance\",\"fare_amount\"]\n", "\n", " ds = tf.data.experimental.make_csv_dataset(tf.io.gfile.glob(train_dir + '/*.csv'),\n", " batch_size=batch_size,\n", " column_names=column_headers,\n", " num_epochs=1,\n", " shuffle=True,\n", " label_name=\"fare_amount\")\n", " ds = ds.map(pack)\n", " return ds\n", "\n", "\n", "if __name__ == \"__main__\":\n", " args, _ = parse_args()\n", " \n", " batch_size = args.batch_size\n", " epochs = args.epochs\n", " learning_rate = args.learning_rate\n", " train_dir = args.training\n", " region = args.sagemaker_region\n", " ds = get_train_data(train_dir, batch_size)\n", " \n", " boto_session = boto3.session.Session(region_name=region)\n", " sagemaker_session = Session(boto_session=boto_session)\n", " \n", " with load_run(sagemaker_session=sagemaker_session) as run:\n", " linear_model = LinearModel()\n", " dnn_model = keras.Sequential([\n", " keras.layers.Flatten(),\n", " keras.layers.Dense(128, activation='elu'), \n", " keras.layers.Dense(64, activation='elu'), \n", " keras.layers.Dense(32, activation='elu'), \n", " keras.layers.Dense(1,activation='sigmoid') \n", " ])\n", " combined_model = WideDeepModel(linear_model, dnn_model)\n", " combined_model.compile(optimizer=\"Adam\", loss=\"mse\", metrics=[\"mse\"])\n", "\n", " combined_model.fit(ds, epochs=epochs, callbacks=SageMakerExperimentCallback(run)) " ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "%%writefile src/requirements.txt\n", "sagemaker >= 2.123.0" ] }, { "cell_type": "code", "execution_count": null, "metadata": { "tags": [] }, "outputs": [], "source": [ "import sagemaker\n", "from sagemaker.tensorflow import TensorFlow\n", "from sagemaker.experiments import Run\n", "\n", "sess = sagemaker.Session()\n", "role = sagemaker.get_execution_role()\n", "bucket = sess.default_bucket()\n", "output_bucket = f\"s3://{bucket}/nyc-taxi/model/\"\n", "\n", "\n", "experiment_name = \"TaxiFare-Experiment\"\n", "run_name = \"TrainingJob-Run\"\n", "\n", "with Run(experiment_name=experiment_name, run_name=run_name) as run:\n", " tf_estimator = TensorFlow(\n", " source_dir=\"src\",\n", " entry_point=\"tf_train.py\",\n", " base_job_name=\"tf2-taxi-wide-deep\",\n", " role=role,\n", " framework_version=\"2.6.2\",\n", " py_version=\"py38\",\n", " input_mode=\"File\",\n", " output_path=output_bucket,\n", " instance_count=1,\n", " instance_type=\"ml.c4.xlarge\",\n", " hyperparameters={\"batch_size\": 512, \"epochs\": 5},\n", " )\n", "\n", " tf_estimator.fit(\n", " {\n", " \"training\": f\"s3://{data_bucket}/train/\",\n", " \"testing\": f\"s3://{data_bucket}/test/\",\n", " },\n", " logs=True,\n", " )" ] }, { "cell_type": "markdown", "metadata": { "tags": [] }, "source": [ "## View Results In SageMaker Experiments Tab" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "instance_type": "ml.t3.medium", "kernelspec": { "display_name": "Python 3 (Data Science)", "language": "python", "name": "python3__SAGEMAKER_INTERNAL__arn:aws:sagemaker:us-east-1:081325390199:image/datascience-1.0" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.7.10" } }, "nbformat": 4, "nbformat_minor": 4 }