# TensorFlow Training at Scale

This notebook is tested using `Data Science - Python 3 Kernel` running on a `ml.t3.medium` instance. Please ensure that you use `Python 3 (Data Science)` in the top right on your notebook.

![img](https://user-images.githubusercontent.com/18154355/216501387-0a2fc1f9-205e-4466-b8f4-120e2e71b452.png)

## Overview

In this notebook, we'll use a Studio notebook to launch ephermal SageMaker Training jobs on our full dataset.


## Loading stored variables
If you ran this notebook before, you may want to re-use the resources you aready created with AWS. Run the cell below to load any prevously created variables. You should see a print-out of the existing variables. If you don't see anything printed then it's probably the first time you are running the notebook!

In [None]:
%store -r
%store

In [None]:
# Ensure updated SageMaker SDK version
%pip install -U -q sagemaker

# Write our training script to disk
We'll be using SageMaker Training with it's prebuilt TensorFlow container optimized for AWS hardware. As such, we're going to create a training script which will be automatically packaged and shipped with our container thanks to the SageMaker SDK.

In [None]:
!mkdir -p src

In [None]:
%%writefile src/tf_train.py

import os
import argparse

import boto3
import tensorflow as tf
from tensorflow.keras.experimental import LinearModel, WideDeepModel
from tensorflow import keras
from sagemaker.experiments import load_run
from sagemaker.session import Session



class SageMakerExperimentCallback(keras.callbacks.Callback):
 def __init__(self, run):
 super().__init__()
 self.run = run
 
 def on_epoch_end(self, epoch, logs=None):
 self.run.log_metric(name="loss", value=logs["loss"], step=epoch)
 self.run.log_metric(name="mse", value=logs["mse"], step=epoch)


def parse_args():

 parser = argparse.ArgumentParser()

 # hyperparameters sent by the client are passed as command-line arguments to the script
 parser.add_argument("--epochs", type=int, default=1)
 parser.add_argument("--batch_size", type=int, default=64)
 parser.add_argument("--learning_rate", type=float, default=0.1)

 # data directories
 parser.add_argument("--training", type=str, default=os.environ["SM_CHANNEL_TRAINING"])
 parser.add_argument("--testing", type=str, default=os.environ["SM_CHANNEL_TESTING"])

 # model directory: we will use the default set by SageMaker, /opt/ml/model
 parser.add_argument("--model_dir", type=str, default=os.environ.get("SM_MODEL_DIR"))
 parser.add_argument("--sagemaker_region", type=str, default='us-east-1')


 return parser.parse_known_args()


def get_train_data(train_dir, batch_size):

 def pack(features, label):
 linear_features = [tf.cast(features['day_of_week'], tf.float32), tf.cast(features['month'], tf.float32),
 tf.cast(features['hour'], tf.float32), features["trip_distance"]]
 
 dnn_features = [tf.cast(features["pickup_location_id"], tf.float32), tf.cast(features["dropoff_location_id"], tf.float32), features["trip_distance"]]
 return (tf.stack(linear_features, axis=-1), tf.stack(dnn_features, axis=-1)), label

 
 column_headers = ["day_of_week","month","hour","pickup_location_id","dropoff_location_id","trip_distance","fare_amount"]

 ds = tf.data.experimental.make_csv_dataset(tf.io.gfile.glob(train_dir + '/*.csv'),
 batch_size=batch_size,
 column_names=column_headers,
 num_epochs=1,
 shuffle=True,
 label_name="fare_amount")
 ds = ds.map(pack)
 return ds


if __name__ == "__main__":
 args, _ = parse_args()
 
 batch_size = args.batch_size
 epochs = args.epochs
 learning_rate = args.learning_rate
 train_dir = args.training
 region = args.sagemaker_region
 ds = get_train_data(train_dir, batch_size)
 
 boto_session = boto3.session.Session(region_name=region)
 sagemaker_session = Session(boto_session=boto_session)
 
 with load_run(sagemaker_session=sagemaker_session) as run:
 linear_model = LinearModel()
 dnn_model = keras.Sequential([
 keras.layers.Flatten(),
 keras.layers.Dense(128, activation='elu'), 
 keras.layers.Dense(64, activation='elu'), 
 keras.layers.Dense(32, activation='elu'), 
 keras.layers.Dense(1,activation='sigmoid') 
 ])
 combined_model = WideDeepModel(linear_model, dnn_model)
 combined_model.compile(optimizer="Adam", loss="mse", metrics=["mse"])

 combined_model.fit(ds, epochs=epochs, callbacks=SageMakerExperimentCallback(run)) 

In [None]:
%%writefile src/requirements.txt
sagemaker >= 2.123.0

In [None]:
import sagemaker
from sagemaker.tensorflow import TensorFlow
from sagemaker.experiments import Run

sess = sagemaker.Session()
role = sagemaker.get_execution_role()
bucket = sess.default_bucket()
output_bucket = f"s3://{bucket}/nyc-taxi/model/"


experiment_name = "TaxiFare-Experiment"
run_name = "TrainingJob-Run"

with Run(experiment_name=experiment_name, run_name=run_name) as run:
 tf_estimator = TensorFlow(
 source_dir="src",
 entry_point="tf_train.py",
 base_job_name="tf2-taxi-wide-deep",
 role=role,
 framework_version="2.6.2",
 py_version="py38",
 input_mode="File",
 output_path=output_bucket,
 instance_count=1,
 instance_type="ml.c4.xlarge",
 hyperparameters={"batch_size": 512, "epochs": 5},
 )

 tf_estimator.fit(
 {
 "training": f"s3://{data_bucket}/train/",
 "testing": f"s3://{data_bucket}/test/",
 },
 logs=True,
 )

## View Results In SageMaker Experiments Tab