# Distributed training of tissue slide images using SageMaker and Horovod

## Background

Neural networks have proven effective at solving complex computer vision tasks such as object detection, image similarity, and classification. With the evolution of low cost GPUs, the computational cost of building and deploying a neural network has drastically reduced. However, most of the techniques are designed to handle pixel resolutions commonly found in visual media, as an example, typical resolution size are 544 and 416 pixels for YOLOv3, 300 and 512 pixels for SSD, and 224 pixels for VGG. Training a classifier over a dataset consisting of gigapixel images (10^9+ pixels) such as satellite, CT, or pathology images is computationally challenging. These images cannot be directly input to a neural network due to their size, as each GPU is limited by available memory. This requires specific pre-processing techniques such as tiling to be able to process the original images in smaller chunks. Furthermore, due to the large size of these images, the overall training time tends to be high, often requiring from several days to weeks without the use of proper scaling techniques such as distributed training.

In this notebook, using detection of cancer from tissue slide images as our use-case, we will deploy a highly scalable machine learning pipeline to:

* Pre-process gigapixel images by tiling, zooming, and sorting them into train and test splits using Amazon SageMaker Processing.
* Train an image classifier on pre-processed tiled images using Amazon SageMaker, Horovod and SageMaker Pipe mode.
* Deploy a pre-trained model as an API using Amazon SageMaker.

## Setup

### Install library for visualizing SVS images

First, we install the `slideio` package for visualizing our digital pathology images.

In [None]:
!pip install slideio===0.5.225
!mkdir -p images

### Imports

Here we import the necessary libraries to interact with SageMaker. We define our execution role, region, and the name of the S3 bucket in the account to which the tissue slide images will be downloaded. We also create our SageMaker session.

In [None]:
import boto3
import sagemaker
from sagemaker.processing import Processor, ProcessingInput, ProcessingOutput
from sagemaker import get_execution_role
from sagemaker.tensorflow import TensorFlow
from sagemaker.tensorflow.model import TensorFlowModel
from sagemaker.session import s3_input

role = get_execution_role()
region = boto3.Session().region_name
bucket = 'tcga-data' # Please specify the bucket where the SVS images are downloaded
sagemaker_session = sagemaker.Session()

Next, we'll import the Python libraries we'll need for the remainder of the exercise.

In [None]:
import os
import slideio
import numpy as np
import tensorflow as tf
import matplotlib.pyplot as plt

### TCGA SVS files

In this blog, we will be using a dataset consisting of whole-slide images obtained from The Cancer Genome Atlas (https://www.cancer.gov/about-nci/organization/ccg/research/structural-genomics/tcga) (TCGA) to accurately and automatically classify them into LUAD (Adenocarcinoma), LUSC (squamous cell carcinoma), or normal lung tissue, where LUAD and LUSC are the two most prevalent subtypes of lung cancer. The dataset is available for public use by NIH and NCI. Instructions for downloading data are provided here (http://www.andrewjanowczyk.com/download-tcga-digital-pathology-images-ffpe/). The raw high resolution images are in SVS (https://openslide.org/formats/aperio/) format. SVS files are used for archiving and analyzing Aperio microscope images.The techniques and tools used in this blog can be applied to any ultra-high resolution image datasets such as MRI, CT scans, and satellite. 

Please refer to README file for instructions on downloading SVS images from TCGA. Before running the next cell, make sure to create a folder called `tcga-svs` within the S3 bucket specified above and download the SVS image data to that location.

The output of the next cell contains a sample image of a tissue slide. Notice that this single image contains over quarter million pixels, and occupies over 750 MBs of memory. This image cannot be fed into directly to a neural network in its original form, and therefore it is necessary to tile the image into many smaller images.

In [None]:
# Download sample svs file from S3
s3 = boto3.resource('s3', region_name=region)

image_file = 'TCGA-55-8514-01A-01-TS1.0e0f5cf3-96e9-4a35-aaed-4340df78d389.svs'
key = f'tcga-svs/0000b231-7c05-4e2e-8c9e-6d0675bfbb34/{image_file}'

s3.Bucket(bucket).download_file(key, f'./images/{image_file}')

# Read svs image
slide = slideio.open_slide(path=f"./images/{image_file}", driver="SVS")
scene = slide.get_scene(0)
block = scene.read_block()

# Display image
plt.imshow(block,aspect="auto")
plt.show()

## Build Docker container for preprocessing SVS files into TFRecords

### Dockerfile

Visualize the Docker file that defines the container to be used by SageMaker Processing.

In [None]:
!pygmentize Dockerfile

### Python script for preprocessing

Visualize the python script that orchestrates the preprocessing of the images within the Docker container.

In [None]:
!pygmentize src/script.py

### Build container and upload it to ECR

Build and push the Docker image to Amazon's Elastic Container Registry (ECR) so that it can be used by SageMaker Processing.

In [None]:
from docker_utils import build_and_push_docker_image

repository_short_name = 'tcga-tissue-slides-preprocess'
image_name = build_and_push_docker_image(repository_short_name)

## Launch SageMaker Processing Job


Now we are ready to launch the SageMaker Processing job on our images. The SVS slide images are pre-processed in three steps.

* *Tiling images*: The images are tiled by non-overlapping 512×512-pixel windows, and tiles containing over 50% background are discarded. The tiles are stored as JPEG images.
* *Converting images to TFRecords*: We use SageMaker Pipe Mode to reduce our training time, which requires the data to be available in a proto-buffer format. TFRecord is a popular proto-buffer format used for training models with TensorFlow. SageMaker Pipe Mode and proto-buffer format are explained in detail in the following section
* *Sorting TFRecords :* We sort the dataset into test, train and validation cohorts for a 3-way classifier (LUAD/LUSC/Normal). In the TCGA dataset, there can be multiple slide images corresponding to a single patient. We need to make sure all the tiles generated from slides corresponding to the same patient should occupy the same split to avoid data leakage. For the test set, we create per-slide TFRecords containing all of the tiles from that slide so that we may evaluate the model in the way it will eventually be realistically deployed.

In [None]:
processor = Processor(image_uri=image_name,
 role=get_execution_role(),
 instance_count=16, # run the job on 16 instances
 base_job_name='processing-base', # should be unique name
 instance_type='ml.m5.4xlarge', 
 volume_size_in_gb=1000)

processor.run(inputs=[ProcessingInput(
 source=f's3://{bucket}/tcga-svs', # s3 input prefix
 s3_data_type='S3Prefix',
 s3_input_mode='File',
 s3_data_distribution_type='ShardedByS3Key', # Split the data across instances
 destination='/opt/ml/processing/input')], # local path on the container
 outputs=[ProcessingOutput(
 source='/opt/ml/processing/output', # local output path on the container
 destination=f's3://{bucket}/tcga-svs-tfrecords/' # output s3 location
 )],
 arguments=['10000'], # number of tiled images per TF record for training dataset
 wait=True,
 logs=True)

### Visualize tiled images stored within TFRecords

Here are samples of tiled images generated after pre-processing the above tissue slide image. These RGB 3 channels images are of size 512*512 and can be directly used as inputs to a neural network. Each of these tiled images are assigned the same label as the parent slide. Additionally, tiled images with more than 50% background are discarded.

In [None]:
%matplotlib inline

print(tf.__version__)
print(tf.executing_eagerly())

HEIGHT=512
WIDTH=512
DEPTH=3
NUM_CLASSES=3

def dataset_parser(value):
 image_feature_description = {
 'label': tf.io.FixedLenFeature([], tf.int64),
 'image_raw': tf.io.FixedLenFeature([], tf.string),
 'slide_string': tf.io.FixedLenFeature([], tf.string)
 }
 record = tf.io.parse_single_example(value, image_feature_description)
 image = tf.io.decode_raw(record['image_raw'], tf.float32)
 image = tf.cast(image, tf.float32)
 image.set_shape([DEPTH * HEIGHT * WIDTH])
 image = tf.cast(tf.reshape(image, [HEIGHT, WIDTH, DEPTH]), tf.float32)
 label = tf.cast(record['label'], tf.int32)
 slide = record['slide_string']
 
 return image, label, slide

# List first 10 tiled images

key = 'tcga-svs-tfrecords/test'

file = [f for f in s3.Bucket(bucket).objects.filter(Prefix=key).limit(1)][0]
local_file = file.key.split('/')[-1]
s3.Bucket(bucket).download_file(file.key, f'./images/{local_file}')

raw_image_dataset = tf.data.TFRecordDataset(f'./images/{local_file}')
parsed_image_dataset = raw_image_dataset.map(dataset_parser)

c = 0
for image_features in parsed_image_dataset:
 image_raw = image_features[0].numpy()
 label = image_features[1].numpy()
 
 plt.figure()
 plt.imshow(image_raw/255) 
 plt.title(f'Full image: {image_features[2].numpy().decode()}, Label: {label}')

 c += 1
 if c == 10:
 break

## Distributed training with Horovod and SageMaker Pipe Mode input
When training a model with large amount of data, the data needs to distributed across multiple CPUs/GPUs on either a single instance or multiple instances. Deep learning frameworks provide their own methods to support distributed training. [Horovod](https://eng.uber.com/horovod/) is a popular, framework-agnostic toolkit for distributed deep learning. It utilizes an all-reduce algorithm for fast distributed training (compared with parameter server approach) and also includes multiple optimization methods to make the distributed training faster. Examples of distributed training with Horovod on SageMaker are available via other AWS blogs ([TensorFlow](https://aws.amazon.com/blogs/machine-learning/multi-gpu-and-distributed-training-using-horovod-in-amazon-sagemaker-pipe-mode/), [MXNet](https://aws.amazon.com/blogs/machine-learning/reducing-training-time-with-apache-mxnet-and-horovod-on-amazon-sagemaker/)).

The following cell defines useful variables for the distributed training process. This includes the computation of the appropriate number of shards given the chosen `train_instance_type` and `train_instance_count`. Also note that the value of `gpus_per_host` should reflect the number of GPUs associated with the `train_instance_type`, which in this case is 4.

In [None]:
train_instance_type='ml.p3.8xlarge'
train_instance_count = 4
gpus_per_host = 4
num_of_shards = gpus_per_host * train_instance_count

distributions = {'mpi': {
 'enabled': True,
 'processes_per_host': gpus_per_host
 }
}

### Sharding the tiles 

SageMaker Pipe Mode is a mechanism for providing S3 data to a training job via Linux pipes. Training programs can read from the fifo pipe and get high-throughput data transfer from S3, without managing the S3 access in the program itself. Pipe Mode is covered in more detail in the SageMaker [documentation](https://sagemaker.readthedocs.io/en/stable/frameworks/tensorflow/using_tf.html#training-with-pipe-mode-using-pipemodedataset).

There are few considerations that we need to keep in mind when working with SageMaker Pipe mode and Horovod:

* The data that is streamed through each pipe is mutually exclusive of each of the other pipes. The number of pipes dictates the number of data shards that need to be created. 
* Horovod wraps the training script for each compute instance. This means that data for each compute instance needs to be allocated to a different shard.
* With the SageMaker Training parameter S3DataDistributionType set to `ShardedByS3Key`, we can share a pipe with more than one instance. The data is streamed in round-robin fashion across instance as shown in the figure below.

The following cell shards the data within S3 to prepare it as input for distributed training with Pipe mode.

In [None]:
# Sharding
client = boto3.client('s3')
result = client.list_objects(Bucket=bucket, Prefix='tcga-svs-tfrecords/train/', Delimiter='/')

j = -1
for i in range(num_of_shards):
 copy_source = {
 'Bucket': bucket,
 'Key': result['Contents'][i]['Key']
 }
 print(result['Contents'][i]['Key'])
 if i % gpus_per_host == 0:
 j += 1
 dest = 'tcga-svs-tfrecords/train_sharded/' + str(j) +'/' + result['Contents'][i]['Key'].split('/')[2]
 print(dest)
 s3.meta.client.copy(copy_source, bucket, dest)

Now that the data is sharded, we can assign these shards as `remote_inputs` to our SageMaker training job.

In [None]:
svs_tf_sharded = f's3://{bucket}/tcga-svs-tfrecords'
shuffle_config = sagemaker.session.ShuffleConfig(234)
train_s3_uri_prefix = svs_tf_sharded
remote_inputs = {}

for idx in range(gpus_per_host):
 train_s3_uri = f'{train_s3_uri_prefix}/train_sharded/{idx}/'
 train_s3_input = s3_input(train_s3_uri, distribution ='ShardedByS3Key', shuffle_config=shuffle_config)
 remote_inputs[f'train_{idx}'] = train_s3_input
 remote_inputs['valid_{}'.format(idx)] = '{}/valid'.format(svs_tf_sharded)
remote_inputs['test'] = '{}/test'.format(svs_tf_sharded)
remote_inputs

### Training script

First, we visualize the training script to be used by SageMaker.

In [None]:
!pygmentize src/train.py

Now we are ready to initialize our SageMaker TensorFlow estimator, specifying `input_mode='Pipe'` to engage Pipe mode and providing our `distributions` variable defined above to activate distributed training. Finally, we call the `fit` method with the `remote_inputs` as the first argument.

In [None]:
local_hyperparameters = {'epochs': 5, 'batch-size' : 16, 'num-train':160000, 'num-val':8192, 'num-test':8192}

estimator_dist = TensorFlow(base_job_name='svs-horovod-cloud-pipe',
 entry_point='src/train.py',
 role=role,
 framework_version='2.1.0',
 py_version='py3',
 distribution=distributions,
 volume_size=1024,
 hyperparameters=local_hyperparameters,
 output_path=f's3://{bucket}/output/',
 instance_count=4, 
 instance_type=train_instance_type,
 input_mode='Pipe')

estimator_dist.fit(remote_inputs, wait=True)

## Deploy the trained model

After training the model using Amazon SageMaker, we can now deploy the trained model to peform inference on new images. A model can be deployed using Amazon SageMaker to get predictions in the following ways:

* To set up a persistent endpoint to get one prediction at a time, use SageMaker hosting services.
* To get predictions for an entire dataset, use SageMaker batch transform.

In this blog post, we will deploy the trained model as a SageMaker endpoint.

In [None]:
%matplotlib inline

plt.style.use('bmh')

The `deploy()` method creates an endpoint that serves prediction requests in real-time.
The model saves keras artifacts; to use TensorFlow serving for deployment, you'll need to save the artifacts in SavedModel format.

In [None]:
# Create predictor from S3 instead

model_data = f's3://{bucket}/output/{estimator_dist.latest_training_job.name}/output/model.tar.gz'

model = TensorFlowModel(model_data=model_data, 
 role=role, framework_version='2.1.0')

predictor = model.deploy(initial_instance_count=1, instance_type='ml.c5.xlarge')

## Make some predictions

Remember that the model is trained on individual tile images. During inference, the SageMaker endpoint provides classification scores for each tile. These scores are averaged out across all tiles to generate the slide-level score and prediction. A majority-vote scheme would also be appropriate. 

The following cells read preprocessed image data from a TFRecords file and use the SageMaker endpoint to compute predictions for each of the tiles. We first define a helper function to extract the individual tile images.

In [None]:
HEIGHT=512
WIDTH=512
DEPTH=3
NUM_CLASSES=3

def _dataset_parser_with_slide(value):
 image_feature_description = {
 'label': tf.io.FixedLenFeature([], tf.int64),
 'image_raw': tf.io.FixedLenFeature([], tf.string),
 'slide_string': tf.io.FixedLenFeature([], tf.string)
 }
 example = tf.io.parse_single_example(value, image_feature_description)
 image = tf.io.decode_raw(example['image_raw'], tf.float32)
 image = tf.cast(image, tf.float32)
 image.set_shape([DEPTH * HEIGHT * WIDTH])
 image = tf.cast(tf.reshape(image, [HEIGHT, WIDTH, DEPTH]), tf.float32)
 label = tf.cast(example['label'], tf.int32)
 slide = example['slide_string']
 
 return image, label, slide

### Tile-level prediction

In the following cell, we create and parse a `TFRecordDataset` from a TFRecords file stored locally at `./images` and use the `predict()` method to perform inference on each of the extracted tiles.

In [None]:
local_file = [each for each in os.listdir('./images') if each.endswith('.tfrecords')][0]

raw_image_dataset = tf.data.TFRecordDataset(f'./images/{local_file}') ## read a TFrecord
parsed_image_dataset = raw_image_dataset.map(_dataset_parser_with_slide) ## Parse TFrecord to JPEGs

pred_scores_list = []
for i, element in enumerate(parsed_image_dataset):
 if i > 10:
 break
 image = element[0].numpy()
 label = element[1].numpy()
 slide = element[2].numpy().decode()
 if i == 0:
 print(f'Making tile-level predictions for slide: {slide}...')

 print(f'Querying endpoint for a prediction for tile {i+1}...')
 pred_scores = predictor.predict(np.expand_dims(image, axis=0))['predictions'][0]
 print(pred_scores)
 pred_class = np.argmax(pred_scores) 
 print(pred_class)
 
 if i > 0 and i % 10 == 0:
 plt.figure()
 plt.title(f'Tile {i} prediction: {pred_class}') 
 plt.imshow(image / 255)
 
 pred_scores_list.append(pred_scores)
print('Done.')

### Slide-level prediction (average score over all tiles)

Once the endpoint has classified each of the tiles, we can average them together for a final classification of the entire slide image.

In [None]:
mean_pred_scores = np.mean(np.vstack(pred_scores_list), axis=0)
mean_pred_class = np.argmax(mean_pred_scores)

print(f"Slide-level prediction for {slide}:", mean_pred_class)