# Data Parallel on Amazon SageMaker Training with PyTorch Lightning and Warm Pools
In this lab we'll write our own deep neural network using PyTorch Lightning, and train this on Amazon SageMaker using multiple GPUs. For more details [see our blog post here.](https://aws.amazon.com/blogs/machine-learning/run-pytorch-lightning-and-native-pytorch-ddp-on-amazon-sagemaker-training-featuring-amazon-search/)

### Step 0. Update the SageMaker Python SDK and AWS botocore

In [None]:
%pip install --upgrade sagemaker
%pip install boto3 --upgrade
%pip install botocore --upgrade

In [None]:
from packaging import version
import botocore

your_botocore_version = botocore.__version__

minimal_version = '1.27.90'

if version.parse(your_botocore_version) >= version.parse(minimal_version):
 print ('You are all set! Please enjoy the lab')
else:
 print ('Stop! Please install the packages in the cell above before continuing.')

In [None]:
# put a string here so you know which jobs are yours, no puncutation or spaces
your_user_string = 'emily'

### Step 1. Upload a dataset to your S3 bucket
The example script we're using points to the MNIST Data loader directly from the training instance, which completely bypasses S3. However, for the sake of argument, we'll show you how to load some sample data from your notebook into S3, and then from S3 onto the training instances. This is useful for larger datasets and storage.

In [None]:
%%writefile train.csv
this,is,my,arbitrary,csv,file

In [None]:
import sagemaker

sess = sagemaker.Session()

# optionally point to whichever bucket you have access to 
bucket = sess.default_bucket()

In [None]:
s3_train_path = 's3://{}/data/mnist/'.format(bucket)

In [None]:
!aws s3 cp train.csv {s3_train_path}

### Step 2. Write train script and requirements into a local directory, here named `scripts`

In [None]:
!mkdir scripts

In [None]:
%%writefile scripts/requirements.txt
pytorch-lightning == 1.6.3
lightning-bolts == 0.5.0

In [None]:
%%writefile scripts/mnist.py

import os
import torch
from torch.nn import functional as F

import pytorch_lightning as pl
from pytorch_lightning.strategies import DDPStrategy

from pytorch_lightning.plugins.environments.lightning_environment import LightningEnvironment
from pl_bolts.datamodules.mnist_datamodule import MNISTDataModule

import argparse

class LitClassifier(pl.LightningModule):
 def __init__(self, hidden_dim: int = 128, learning_rate: float = 0.0001):
 super().__init__()
 self.save_hyperparameters()

 self.l1 = torch.nn.Linear(28 * 28, self.hparams.hidden_dim)
 self.l2 = torch.nn.Linear(self.hparams.hidden_dim, 10)

 def forward(self, x):
 x = x.view(x.size(0), -1)
 x = torch.relu(self.l1(x))
 x = torch.relu(self.l2(x))
 return x

 def training_step(self, batch, batch_idx):
 x, y = batch
 y_hat = self(x)
 loss = F.cross_entropy(y_hat, y)
 return loss

 def validation_step(self, batch, batch_idx):
 x, y = batch
 probs = self(x)
 acc = self.accuracy(probs, y)
 return acc

 def test_step(self, batch, batch_idx):
 x, y = batch
 logits = self(x)
 acc = self.accuracy(logits, y)
 return acc

 def accuracy(self, logits, y):
 acc = (logits.argmax(dim=-1) == y).float().mean()
 return acc

 def validation_epoch_end(self, outputs) -> None:

 self.log("val_acc", torch.stack(outputs).mean(), prog_bar=True)

 def test_epoch_end(self, outputs) -> None:
 self.log("test_acc", torch.stack(outputs).mean())

 def configure_optimizers(self):
 return torch.optim.Adam(self.parameters(), lr=self.hparams.learning_rate)

 
def parse_args():
 parser = argparse.ArgumentParser()

 parser.add_argument("--hosts", type=list, default=os.environ["SM_HOSTS"])
 parser.add_argument("--current-host", type=str, default=os.environ["SM_CURRENT_HOST"])
 parser.add_argument("--model-dir", type=str, default=os.environ["SM_MODEL_DIR"])
 parser.add_argument("--train-dir", type=str, default=os.environ["SM_CHANNEL_TRAIN"])
 parser.add_argument("--num-gpus", type=int, default=int(os.environ["SM_NUM_GPUS"]))

 parser.add_argument("--num_nodes", type=int, default = len(os.environ["SM_HOSTS"]))
 
 # num gpus is per node
 world_size = int(os.environ["SM_NUM_GPUS"]) * len(os.environ["SM_HOSTS"])
 
 parser.add_argument("--world-size", type=int, default=world_size)
 
 parser.add_argument("--batch_size", type=int, default=int(os.environ["SM_HP_BATCH_SIZE"])) 
 
 parser.add_argument("--epochs", type=int, default=int(os.environ["SM_HP_EPOCHS"]))

 
 args = parser.parse_args()
 
 return args
 
 
if __name__ == "__main__":
 
 args = parse_args()
 
 cmd = 'ls {}'.format(args.train_dir)
 
 print ('Here is sample arbitrary csv train file!')
 
 os.system(cmd)
 
 dm = MNISTDataModule(batch_size=args.batch_size)
 
 model = LitClassifier()
 
 local_rank = os.environ["LOCAL_RANK"]
 torch.cuda.set_device(int(local_rank))
 
 num_nodes = args.num_nodes
 num_gpus = args.num_gpus
 
 env = LightningEnvironment()
 
 env.world_size = lambda: int(os.environ.get("WORLD_SIZE", 0))
 env.global_rank = lambda: int(os.environ.get("RANK", 0))
 
 ddp = DDPStrategy(cluster_environment=env, accelerator="gpu")
 
 trainer = pl.Trainer(max_epochs=args.epochs, strategy=ddp, devices=num_gpus, num_nodes=num_nodes, default_root_dir = args.model_dir)
 trainer.fit(model, datamodule=dm)
 trainer.test(model, datamodule=dm)
 

### Step 3. Configure the SageMaker Training Estimator
In this step you are using the [SageMaker Python SDK](https://sagemaker.readthedocs.io/en/stable/api/training/estimators.html) as a wrapper around the core api, `create-training-job`, [as described here.](https://docs.aws.amazon.com/cli/latest/reference/sagemaker/create-training-job.html)

Read more about [training on SageMaker here,](https://docs.aws.amazon.com/sagemaker/latest/dg/how-it-works-training.html) with distributed training details [here.](https://docs.aws.amazon.com/sagemaker/latest/dg/distributed-training.html)


You can see [instance details for SageMaker here,](https://aws.amazon.com/sagemaker/pricing/) along with [instance specs from EC2 directly here.](https://aws.amazon.com/ec2/instance-types/)

In [None]:
import sagemaker
from sagemaker.pytorch import PyTorch
from sagemaker.local import LocalSession

sagemaker_session = sagemaker.Session()
role = sagemaker.get_execution_role()
region = sagemaker_session.boto_region_name

# hard code point to the DLC images
image_uri = '763104351884.dkr.ecr.{}.amazonaws.com/pytorch-training:1.12.0-gpu-py38-cu113-ubuntu20.04-sagemaker'.format(region)

estimator = PyTorch(
 entry_point="mnist.py",
 base_job_name="{}-ddp-mnist".format(your_user_string),
 image_uri = image_uri,
 role=role,
 source_dir="scripts",
 # configures the SageMaker training resource, you can increase as you need
 instance_count=1,
 instance_type="ml.g4dn.12xlarge",
 py_version="py38",
 sagemaker_session=sagemaker_session,
 distribution={"pytorchddp":{"enabled": True}},
 debugger_hook_config=False,
 #profiler_config=profiler_config,
 hyperparameters={"batch_size":32, "epochs":300},
 # enable warm pools for 20 minutes
 keep_alive_period_in_seconds = 20 *60)

In [None]:
# Passing True will halt your kernel, passing False will not. Both create a training job.
# here we are defining the name of the input train channel. you can use whatever name you like! up to 20 channels per job.
estimator.fit(wait=True, inputs = {'train':s3_train_path})

### Step 4. Rerun the job with a higher batch size to increase GPU Utilization

In [None]:
estimator = PyTorch(
 entry_point="mnist.py",
 base_job_name="{}-ddp-mnist".format(your_user_string),
 image_uri = image_uri,
 role=role,
 source_dir="scripts",
 # configures the SageMaker training resource, you can increase as you need
 instance_count=1,
 instance_type="ml.g4dn.12xlarge",
 py_version="py38",
 sagemaker_session=sagemaker_session,
 distribution={"pytorchddp":{"enabled": True}},
 debugger_hook_config=False,
 #max_retry_attempts=5,
 hyperparameters={"batch_size":320, "epochs":900},
 # turn off warm pools for this instance
 keep_alive_period_in_seconds = 0)

In [None]:
estimator.fit(wait=False, inputs = {'train':s3_train_path})

### Step 5. Optimize!
That's the end of this lab. However, in the real world, this is just the begining. Here are a few extra things you can do to increase model accuracy (hopefully) and decrease job runtime (certainly).
1. **Increase throughput by adding extra nodes.** Increase the number of instances in `instance_count`, and as long as you're using some for of distribution in your training script, this will automatically copy your model over all available accelerators and handle averaging the results. This is also called ***horizontal scaling.***
2. **Increase throughput by upgrading your instances.** In addition to (or sometimes instead of) adding extra nodes, you can increase your instance to something larger. This usually means adding more accelerators, more CPU, more memory, and more bandwidth. 
3. **Increase accuracy with hyperparameter tuning**. Another critical step is picking the right hyperparameters. You can use [Syne Tune](https://github.com/awslabs/syne-tune/blob/hf_blog_post/hf_blog_post/example_syne_tune_for_hf.ipynb) for a multi-objective tuning metric as one example. Amazon SageMaker Automatic Model Tuning now provides up to three times faster hyperparameter tuning with [Hyperband](https://aws.amazon.com/blogs/machine-learning/amazon-sagemaker-automatic-model-tuning-now-provides-up-to-three-times-faster-hyperparameter-tuning-with-hyperband/). Here's another example with [SageMaker Training Compiler to tune the batch size!](https://github.com/aws/amazon-sagemaker-examples/blob/main/sagemaker-training-compiler/huggingface/pytorch_tune_batch_size/finding_max_batch_size_for_model_training.ipynb)
4. **Increase accuracy by adding more parameters to your model, and using a model parallel strategy.** [This is the content of the next lab!](https://github.com/aws-samples/sagemaker-distributed-training-workshop/blob/main/2_model_parallel/smp-train-gpt-simple.ipynb)