# 2D Semantic Segmentation on the Audi A2D2 Dataset
In this notebook, we train a model on the 2D semantic segmentation annotations from the Audi A2D2 Dataset https://www.a2d2.audi/a2d2/en/dataset.html. The dataset can also be accessed from the the AWS Open Data Registry https://registry.opendata.aws/aev-a2d2/. 

We do the following: 

 1. We download the semantic segmentation dataset archive
 1. We inspect and describe the data
 1. We run local processing to produce a dataset manifest (list of all records), and split the data in training and validation sections. 
 1. We send the data to Amazon S3
 1. We create a PyTorch script training a DeepLabV3 model, that we test locally for few iterations
 1. We launch our training script on a remote, long-running machine with SageMaker Training API. 
 1. We show how to run bayesian parameter search to tune the metric of your choice (loss, accuracy, troughput...). To keep costs low, this is deactivated by default
 1. We open a model checkpoint (collected in parallel to training by SageMaker Training) to check prediction quality

The demo was created from a SageMaker ml.g4dn.16xlarge Notebook instance, with a Jupyter Kernel `conda_pytorch_latest_p37` (`torch 1.8.1+cu111`, `torchvision 0.9.1+cu111`). Feel free to use a different instance for the download and pre-processing step so that a GPU doesn't sit idle, and switch to a different instance type later. Note that the dataset download and extraction **do not run well on SageMaker Studio Notebooks**, whose storage is EFS based and struggles to handle the 80k+ files composing the dataset. Launching API calls (training job, tuning jobs) from Studio should run fine though.

**IMPORTANT NOTES**

* **This sample is written for single-GPU instances only. Using machines with more than 1 GPU or running the training code on more than 1 machines will not use all available hardware**

* **Running this demo necessitates at least 400 Gb of local storage space**

* **Running this demo on an ml.G4dn.16xlarge instance in region eu-west-1 takes approximately 50min of notebook uptime and approximately 12h of SageMaker Training job execution (excluding the bayesian parameter search, de-activated by default). This represents approximately 6 USD of notebook usage (if running on ml.g4dn.16xlarge) and 72 USD of training API**

* **This demo uses non-AWS, open-source libraries including PyTorch, PIL, matplotlib, Torchvision. Use appropriate due diligence to verify if that use fits the software standards and compliance rules in place at your organization** 

* **This sample is provided for demonstration purposes. Make sure to conduct appropriate testing if derivating this code for your own use-cases. In general it is recommend to isolate development from production environments. Read more in the AWS Well Architected Framework https://aws.amazon.com/architecture/well-architected/**

In [None]:
import json
import multiprocessing as mp
import os
import time
import uuid

import boto3
from PIL import Image
import sagemaker
from sagemaker import Session
from sagemaker.pytorch import PyTorch
from sagemaker import get_execution_role


sess = Session()
bucket = '' # SageMaker will use this bucket to store data, script and model checkpoints

s3 = boto3.client('s3')

# 1. Dataset preparation

In [None]:
# Data will be downloaded there, and new folders created. Feel free to customize
work_dir = '/home/ec2-user/SageMaker'
dataset_prefix = 'a2d2_images'
data_dir = work_dir + '/' + dataset_prefix

# locations used for local testing
local_dataset_cache = work_dir + '/a2d2-tmp'
local_checkpoint_location = work_dir + '/a2d2-checkpoints'

### Download files

In [None]:
%%time

# Download images. This took 12min on a ml.g4dn.16xlarge instance in eu-west-1 region
! aws s3 cp s3://aev-autonomous-driving-dataset/camera_lidar_semantic.tar $work_dir

In [None]:
%%time

# Download labels
! aws s3 cp s3://aev-autonomous-driving-dataset/camera_lidar_semantic_instance.tar $work_dir

In [None]:
%%time

# Download the README
! aws s3 cp s3://aev-autonomous-driving-dataset/README-SemSeg.txt $work_dir

### Uncompress
This takes about 20min

In [None]:
# We create a new folder dedicated to the A2D2 dataset
print('Creating folder {}'.format(data_dir))

try:
 os.mkdir(data_dir)
 
except(FileExistsError):
 print('Directory already exists')

In [None]:
%%time

! tar -xf {work_dir}/camera_lidar_semantic.tar -C $data_dir

### Analyse dataset structure
We check how labels and images are organized. This was necessary to build an appropriate Dataset class

In [None]:
# Frames are grouped in 23 sequences
data_folder = 'camera_lidar_semantic'
os.listdir(os.path.join(data_dir, data_folder))

In [None]:
# each sequence contain folders for labels, lidar and camera capture
os.listdir(os.path.join(data_dir, data_folder, '20180925_112730'))

In [None]:
# each of those folders contain one of multiple folders based on camera that captured the data
os.listdir(os.path.join(data_dir, data_folder, '20180925_112730/camera'))

In [None]:
# 10 first records of the front center camera capture of the 2018-09-25 11:27:30 sequence
os.listdir(os.path.join(data_dir, data_folder, '20180925_112730/camera/cam_front_center'))[:10]

In [None]:
# view one image

image_id = '000074771'


with Image.open(os.path.join(data_dir, data_folder, '20180925_112730/camera/cam_front_center/'
 + '20180925112730_camera_frontcenter_{}.png'.format(image_id))) as im:

 im.show()

In [None]:
# view associated label

with Image.open(os.path.join(data_dir, data_folder, '20180925_112730/label/cam_front_center/'
 + '20180925112730_label_frontcenter_{}.png'.format(image_id))) as im:
 
 im.show()

### Anomalies to watch out of

* On October 2021 record `a2d2_images/camera_lidar_semantic/20180925_135056/label/cam_side_left/20180925135056_label_sideleft_000026512.png` returns a 4-channel image

### Pre-process
To simplify the ML process, we build a flat JSON manifest mapping, for a given record ID, the path to image and to label

In [None]:
root = os.path.join(data_dir, data_folder) # where we'll read images from
relative = os.path.join(dataset_prefix, data_folder) # the image key prefix we'll use to write images in S3

# we sort sequences so that train-test split by sequence index is deterministic
sequences = [s for s in os.listdir(root) if s.startswith('2018')]
sequences.sort()
print(sequences)

In [None]:
manifest = {}

for s in sequences:
 cameras = os.listdir(root + '/{}/camera'.format(s))
 for c in cameras:
 
 images = [f for f in os.listdir(root + '/{}/camera/{}'.format(s, c))
 if f.endswith('.png')]
 
 for i in images:
 label_name = i.replace('camera', 'label')
 im_id = i[:i.find('_')] + '_' + i[i.rfind('.')-9:i.rfind('.')]
 image_path_local = root + '/{}/camera/{}/{}'.format(s, c, i)
 label_path_local = root + '/{}/label/{}/{}'.format(s, c, label_name)
 image_path_manifest = relative + '/{}/camera/{}/{}'.format(s, c, i)
 label_path_manifest = relative + '/{}/label/{}/{}'.format(s, c, label_name)
 
 # create record only if both image file and label file exist:
 if os.path.isfile(image_path_local) and os.path.isfile(label_path_local):
 manifest[im_id] = {}
 manifest[im_id]['sequence_id'] = s
 manifest[im_id]['image_name'] = i
 manifest[im_id]['label_name'] = label_name
 # remove the work-dir from the path so that the manifest stays small and generic
 manifest[im_id]['image_path'] = image_path_manifest
 manifest[im_id]['label_path'] = label_path_manifest
 else:
 print('issue with image {} : -------'.format(image_path_local))
 # check if both image and label exist
 print('image file {} exists: {}'.format(image_path_local, os.path.isfile(image_path_local)))
 print('label file {} exists: {}'.format(image_path_local, os.path.isfile(image_path_local))) 
 
print("Created a dataset manifest with {} records".format(len(manifest)))

We then send images to S3 with a multi-processing call. This should take 10-15min on a large G4 instance and results in 139 Gb on S3. You can try to go faster using more workers in the `multiprocessing.Pool(workers)`, but be aware that too much concurrency may cause instability and crashes in your kernel and instance

In [None]:
def send_images_to_s3(image_id):
 
 s3.upload_file(Filename=work_dir + '/' + manifest[image_id]['image_path'],
 Bucket=bucket,
 Key=manifest[image_id]['image_path'])
 
 s3.upload_file(Filename=work_dir + '/' + manifest[image_id]['label_path'],
 Bucket=bucket,
 Key=manifest[image_id]['label_path'])

In [None]:
%%time

with mp.Pool(mp.cpu_count()) as pool:
 
 pool.map(send_images_to_s3, manifest.keys())

In [None]:
# we also need to send class_list to S3
s3.upload_file(
 Filename=root + '/' + 'class_list.json',
 Bucket=bucket,
 Key=dataset_prefix + '/metadata/class_list.json')

We split the dataset in a training and validation manifest

In [None]:
split = 0.9
train_sequences = sequences[:int(split*len(sequences))]
val_sequences = sequences[int(split*len(sequences)):]

In [None]:
train_manifest = {k:manifest[k] for k in manifest.keys() if manifest[k]['sequence_id'] in train_sequences}
val_manifest = {k:manifest[k] for k in manifest.keys() if manifest[k]['sequence_id'] in val_sequences}

In [None]:
print("training set contains {} records".format(len(train_manifest)))
print("validation set contains {} records".format(len(val_manifest)))

In [None]:
with open(work_dir + "/train_manifest.json", "w") as file:
 json.dump(train_manifest, file)
 
with open(work_dir + "/val_manifest.json", "w") as file:
 json.dump(val_manifest, file)

In [None]:
for file in ['train_manifest.json', 'val_manifest.json']:

 s3.upload_file(
 Filename=work_dir + '/' + file,
 Bucket=bucket,
 Key=dataset_prefix + '/metadata/{}'.format(file))
 
train_path = 's3://{}/'.format(bucket) + dataset_prefix + '/metadata/'
print('Training manifests sent to {}'.format(train_path))

# 2. Single-GPU training
We create the training script as single Python file. 
To make training code scalable and portable, we create a custom PyTorch Dataset that reads images and segmentation masks directly from S3, and save in local cache in case of later re-use (eg if training with multiple epochs). That was we have a data pipeline that does not need to wait for all dataset to be downloaded locally, but that will read at low-latency after the first epoch.

**Note** this DL training code is **far from state-of-the-art**. The goal of this sample is not to reach a good accuracy, but rather to show how to scale custom training jobs in Amazon SageMaker. 

 * **Better accuracy** can likely be reached using data augmentation, learning rate scheduling, a better backbone, and adding the auxiliary DeepLabV3 loss. And why not a totally different segmentation model instead of DeepLabV3
 
 * **Better throughput** can likely be reached using a sequential-access dataset, that reads group of records, or the SageMaker Fast File Mode, that streams files upon read request. Also, although I'm configuring it below, I am not sure if float16 precision compute occur and if NVIDIA TensorCores are actually used. This would be an important step to make full use of the computational power of modern NVIDIA cards. Converting labels to grayscale should also help making the dataloading lighter. Offloading data loading to the GPU, for example using NVIDIA DALI, is another axis to explore to boost throughput.

## Run locally

In [None]:
# create a folder to cache dataset as it is downloaded by the dataset class
print('Local dataset cache created at {}'.format(local_dataset_cache))
print('Local checkpoints will be stored at {}'.format(local_checkpoint_location))

try:
 os.mkdir(local_dataset_cache)
 
except(FileExistsError):
 print('{} already exists'.format(local_dataset_cache))

try:
 os.mkdir(local_checkpoint_location)
 
except(FileExistsError):
 print('{} already exists'.format(local_checkpoint_location))

### Single-device code
can be run in a Python process

In [None]:
%%time

# test on 20 iterations. 
# This takes a few minutes. You can see instance activity live using htop or nividia-smi in instance terminal
! python a2d2_code/train.py --dataset $work_dir \
 --cache $local_dataset_cache \
 --height 604 \
 --width 960 \
 --checkpoint-dir $local_checkpoint_location \
 --batch 12 \
 --network deeplabv3_mobilenet_v3_large \
 --workers 24 \
 --log-freq 20 \
 --prefetch 2 \
 --bucket $bucket \
 --eval-size 10 \
 --iterations 20 \
 --class-list a2d2_images/camera_lidar_semantic/class_list.json

## Launch in SageMaker Training
We use the SageMaker Python SDK to orchestrate SageMaker Training clusters. Note that if you don't want to learn yet another SDK, you can also do exactly the same thing with existing AWS SDKs, for example the AWS CLI (https://docs.aws.amazon.com/cli/latest/reference/sagemaker/create-training-job.html) and boto3 (https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/sagemaker.html#SageMaker.Client.create_training_job)

In [None]:
config = {
 'bucket': bucket,
 'cache': '/opt/ml/input/data/dataset',
 'height': 604,
 'width': 960,
 'epochs': 10,
 'batch': 12,
 'prefetch': 1,
 'workers': 40,
 'eval-size': 36,
 'lr': 0.183,
 'momentum': 0.928,
 'lr_warmup_ratio':0.1,
 'lr_decay_per_epoch': 0.3,
 'log-freq': 500}

In [None]:
# Training time of this job will be approximately 12h

token = str(uuid.uuid4())[:10] # we create a unique token to avoid checkpoint collisions in S3

job = PyTorch(
 entry_point='train.py',
 source_dir='a2d2_code',
 role=get_execution_role(),
 framework_version='1.8.1',
 instance_count=1,
 instance_type='ml.g4dn.16xlarge',
 base_job_name='A2D2-single-GPU-seg-training',
 py_version='py36',
 hyperparameters=config,
 checkpoint_s3_uri='s3://{}/{}/checkpoints'.format(bucket, token), # S3 destination of /opt/ml/checkpoints files
 output_path='s3://{}/{}'.format(bucket, token),
 code_location='s3://{}/{}/code'.format(bucket, token), # source_dir code will be staged in S3 there
 environment={"SMDEBUG_LOG_LEVEL":"off"}, # reduce verbosity of Debugger
 debugger_hook_config=False, # deactivate debugger to avoid warnings in model artifact
 disable_profiler=True, # keep running resources to a minimum to avoid permission errors
 metric_definitions=[
 {"Name": "Train_loss", "Regex": "Training_loss: ([0-9.]+).*$"},
 {"Name": "Learning_rate", "Regex": "learning rate: ([0-9.]+).*$"}, 
 {"Name": "Val_loss", "Regex": "Val_loss: ([0-9.]+).*$"}, 
 {"Name": "Throughput", "Regex": "Throughput: ([0-9.]+).*$"}
 ],
 tags=[{'Key': 'Project', 'Value': 'A2D2_segmentation'}]) # tag the job for experiment tracking

SageMaker-managed I/O uploads only dataset metadata (class_list and manifest). The actual records and labels are fetched upon request directly from S3 or local cache via our custom `Dataset`

In [None]:
# we do an asynchronous fit, so the job doesn't keep the client waiting. 
# closing and shutting down your notebook will not stop this job.
# if you want to stop this SageMaker Training job, use an AWS SDK or the console
job.fit({'dataset': train_path}, wait=False)

## Custom Metric Tuning

SageMaker Automated Model Tuning is a serverless managed, use-case agnostic parameter search service. With SageMaker AMT (sometimes named HPO - Hyperparameter Optimization) you can tune any parameter declared in the container hyperparameter dictionary (continuous, integer or categorical) and you can tune for any metric (minimize or maximize) that you can regexp from your container or script logs. SageMaker AMT is not limited to model tuning. You can be creative with it, and for example tune jobs to minimize the training time or training cost. See https://aws.amazon.com/blogs/machine-learning/aerobotics-improves-training-speed-by-24-times-per-sample-with-amazon-sagemaker-and-tensorflow/ for a nice example. 

More info:

 * https://docs.aws.amazon.com/sagemaker/latest/dg/automatic-model-tuning.html
 * *Amazon SageMaker Automatic Model Tuning: Scalable Gradient-Free Optimization*, Perone et al. (https://arxiv.org/abs/2012.08489)

In [None]:
Tuning = False # set to true if you want to test the tuning below

In [None]:
print("tuning cell set to {}".format(Tuning))

if Tuning:

 # we use the SageMaker Tuner
 from sagemaker.tuner import IntegerParameter, ContinuousParameter
 
 
 tuning_config = {
 'bucket': bucket,
 'cache': '/opt/ml/input/data/dataset',
 'height': 604,
 'width': 960,
 'epochs': 5,
 'prefetch': 1,
 'workers': 40,
 'eval-size': 36,
 'log-freq': 500}
 
 
 tuning_config = PyTorch(
 entry_point='train.py',
 source_dir='a2d2_code',
 role=get_execution_role(),
 framework_version='1.8.1',
 instance_count=1,
 instance_type='ml.g4dn.16xlarge',
 py_version='py36',
 max_run=28800, # cap the max runtime at 8h per job
 hyperparameters=tuning_config,
 checkpoint_s3_uri='s3://{}/checkpoints'.format(bucket), # S3 destination of /opt/ml/checkpoints files
 output_path='s3://{}'.format(bucket),
 code_location='s3://{}/code'.format(bucket), # source_dir code will be staged in S3 there 
 environment={"SMDEBUG_LOG_LEVEL":"off"}, # reduce verbosity of Debugger
 debugger_hook_config=False, # deactivate debugger to avoid warnings in model artifact
 disable_profiler=True, # keep running resources to a minimum to avoid permission errors
 metric_definitions=[ 
 {"Name": "Val_loss", "Regex": "Val_loss: ([0-9.]+).*$"}, 
 ],
 tags=[{'Key': 'Project', 'Value': 'A2D2_segmentation'}])
 
 
 # Define exploration boundaries
 hyperparameter_ranges = {
 'lr': ContinuousParameter(0.001, 0.01),
 'momentum': ContinuousParameter(0.8, 0.99),
 'lr_warmup_ratio': ContinuousParameter(1, 10),
 'lr_decay_per_epoch': ContinuousParameter(0.1, 0.8),
 'batch': IntegerParameter(6, 12)
 }
 
 
 # create Optimizer
 # you can tune for anything you can regexp from your logs
 # in this sample we minimize the validation loss
 Optimizer = sagemaker.tuner.HyperparameterTuner(
 estimator=tuning_config,
 hyperparameter_ranges=hyperparameter_ranges,
 base_tuning_job_name='Loss-tuner',
 objective_type='Minimize',
 objective_metric_name='Val_loss',
 strategy='Bayesian',
 early_stopping_type='Auto',
 metric_definitions=[
 {"Name": "Val_loss", "Regex": "Val_loss: ([0-9.]+).*$"}
 ], 
 max_jobs=40,
 max_parallel_jobs=2)
 
 
 Optimizer.fit({'dataset': train_path}, wait=False)
 
 print("Tuning job launched")

# 3. Predict with trained model
to test the trained model, we run inference on couple samples from the validation set. You can run this section on its own once you have a trained model

In [None]:
from matplotlib import pyplot as plt
import torch
from torchvision import models
from torchvision.io import read_image
from torchvision.datasets.vision import VisionDataset
from torchvision.models.segmentation.deeplabv3 import DeepLabHead
from torchvision.transforms import Resize
from torchvision.transforms.functional import InterpolationMode

### Bring a checkpoint from S3
In the cell below we download a checkpoint produced by a training job, which could come either from the above-launched training job or from the training job launched from the optional tuning step.

To check available checkpoints for a given training job, you can inspect the S3 ARN returned at `CheckpointConfig` by `boto3` `describe_training_job`, or you can also check in the training job detail page the S3 Output Path URL in the "Checkpoint configuration" section

In [None]:
# you need to wait around 15min until you have the first checkpoint showing up in Amazon S3
! aws s3 cp $work_dir

In [None]:
model = torch.load(os.path.join(work_dir, 'final_model.pth')) # replace with your model name if different

In [None]:
model.eval()

### instantiate the dataset
This is necessary for inference pre-processing (applying same transforms to input and label as at training)

In [None]:
height = 604
width = 960

from a2d2_code.a2d2_utils import A2D2_S3_dataset


image_transform = Resize(
 (height, width),
 interpolation=InterpolationMode.BILINEAR)

target_transform = Resize(
 (height, width),
 interpolation=InterpolationMode.NEAREST)

train_data = A2D2_S3_dataset(
 manifest_file=work_dir + '/train_manifest.json',
 class_list=data_dir + '/camera_lidar_semantic/class_list.json',
 transform=image_transform,
 target_transform=target_transform,
 cache='/home/ec2-user/',
 height=height,
 width=width,
 s3_bucket=bucket)

We measure pixel accuracy on couple pictures. IoU is also relevant for segmentation, we leave that for a later iteration :)

In [None]:
plt.rcParams["figure.figsize"] = [15, 7]


def pixel_acc(T1, T2):
 return (T1 == T2).sum()/(T1.size()[0]*T1.size()[1])


# take first 10 pictures from the val_manifest

with open(work_dir + '/val_manifest.json') as file:
 val_manifest = json.load(file)

pic_ids = list(val_manifest.keys())[:10]
 
 
for pic_id in pic_ids:
 
 image_path = val_manifest[pic_id]['image_path']
 label_path = val_manifest[pic_id]['label_path']
 pic = image_transform(read_image(os.path.join(work_dir, image_path)))
 label = target_transform(read_image(os.path.join(work_dir, label_path)))
 
 mask = torch.zeros(height, width)
 for rgb, cid in train_data.rgb2ids.items():
 color_mask = label == torch.Tensor(rgb).reshape([3,1,1]) 
 seg_mask = color_mask.sum(dim=0) == 3
 mask[seg_mask] = cid 
 
 mask = mask.type(torch.int64) 
 
 pred = model(torch.div(pic, 255).unsqueeze(0).to("cuda:0"))["out"]
 flat_pred = torch.argmax(pred, dim=1)[0]
 
 mask_np = mask.cpu().numpy()
 flat_pred_np = flat_pred.cpu().numpy()
 
 fig, (ax1, ax2) = plt.subplots(1, 2)
 fig.suptitle(pic_id)
 ax1.imshow(mask_np)
 ax2.imshow(flat_pred_np)
 
 print("Image {}: PIXEL ACCURACY: {}".format(pic_id, pixel_acc(flat_pred.cuda(), mask.cuda())))
 
 print("*"*20)