# Distributed training with horovod
Horovod is a distributed training framework based on MPI. Horovod is only available with TensorFlow version 1.12 or newer. You can find more details at [Horovod README](https://github.com/uber/horovod).

To enable Horovod, we need to make small changes to our script.

## Create a training script that support Horovod distributed training
Create a copy of the script (training_script/cifar10_keras_sm.py, **not the pipe script**) and save it as training_script/cifar10_keras_dist.py.
in:
```python
def main(args):
```

### Start horovod
add horovod support using the following code:
```python
 import horovod.keras as hvd
 hvd.init()
 config = tf.ConfigProto()
 config.gpu_options.allow_growth = True
 config.gpu_options.visible_device_list = str(hvd.local_rank())
 K.set_session(tf.Session(config=config))
```

### Configure callbacks
add the following callbacks:
```python
hvdBroadcast = hvd.callbacks.BroadcastGlobalVariablesCallback(0)
hvdMetricAverage = hvd.callbacks.MetricAverageCallback()
hvdLearningRate = hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=5, verbose=1)
```

change the checkpoint and tensorboard callback to run only on `hvd.rank() == o` (You want only a single process the send logs)
```python
callbacks = [hvdBroadcast,hvdMetricAverage,hvdLearningRate]
if hvd.rank() == 0:
 callbacks.append(checkpoint)
 callbacks.append(tb_callback)
```
update model.fit to use the new callbacks list

### Configure the optimizer
in
```python
# Add hvd to the function. also add it in the function call
def keras_model_fn(learning_rate, weight_decay, optimizer, momentum, hvd): 
```
configure the horovod optimizer.
Change `size=1` to `size=hvd.size()` 

add 
```python
opt = hvd.DistributedOptimizer(opt)
```
before 
```python
 model.compile(loss='categorical_crossentropy',
 optimizer=opt,
 metrics=['accuracy'])
```

## Run Distributed training
To start a distributed training job with Horovod, configure the job distribution:
```python
distributions = {'mpi': {
 'enabled': True,
 'processes_per_host': # Number of Horovod processes per host
 }
 }
```

Run the same job using 2 ml.p3.2xlarge instances (processes_per_host:1). 
add the distributions configuration

In [None]:
import os
import sagemaker
from sagemaker import get_execution_role

sagemaker_session = sagemaker.Session()

role = get_execution_role()

### Load the SageMaker experiment

In [None]:
from smexperiments.experiment import Experiment
from smexperiments.trial import Trial
import time
cifar10_experiment = Experiment.load(
 experiment_name="TensorFlow-cifar10-experiment")

In [None]:
# create a new trial
trial_name = f"cifar10-training-job-distributed-{int(time.time())}"
trial = Trial.create(
 trial_name=trial_name, 
 experiment_name=cifar10_experiment.experiment_name
)

In [None]:
# Configure the dataset location variable
dataset_location = sagemaker_session.upload_data(path='data', key_prefix='data/DEMO-cifar10')
display(dataset_location)

In [None]:
metric_definitions = [
 {'Name': 'train:loss', 'Regex': 'loss: ([0-9\\.]+) - acc: [0-9\\.]+'},
 {'Name': 'train:accuracy', 'Regex': 'loss: [0-9\\.]+ - acc: ([0-9\\.]+)'},
 {'Name': 'validation:accuracy', 'Regex': 'val_loss: [0-9\\.]+ - val_acc: ([0-9\\.]+)'},
 {'Name': 'validation:loss', 'Regex': 'val_loss: ([0-9\\.]+) - val_acc: [0-9\\.]+'},
]

In [None]:
from sagemaker.tensorflow import TensorFlow
# Change base_job_name to 'cifar10-dist' for console visibility
# Remember to configure distributions = ...
estimator = ... 

Connect the trial configured above to the job. add the experiment config to the fit function.
```python
experiment_config={
 "ExperimentName": cifar10_experiment.experiment_name, 
 "TrialName": trial.trial_name,
 "TrialComponentDisplayName": "Training"}
```

In [None]:
estimator.fit({'train' : 'train_data_location',
 'validation' : 'validation_data_location',
 'eval' : 'eval_data_location'},
 experiment_config=)

### Analyze the experiments

In [None]:
search_expression = {
 "Filters":[
 {
 "Name": "DisplayName",
 "Operator": "Equals",
 "Value": "Training",
 }
 ],
}

In [None]:
import pandas as pd 
pd.options.display.max_columns = 500

from sagemaker.analytics import ExperimentAnalytics
trial_component_analytics = ExperimentAnalytics(
 sagemaker_session=sagemaker_session, 
 experiment_name=cifar10_experiment.experiment_name,
 search_expression=search_expression
)

table = trial_component_analytics.dataframe(force_refresh=True)
display(table)

**Good job!** 
You can now use SageMaker training jobs for distributed jobs.
Before continuing to the next notebook, look at the distribution job metrics from CloudWatch and TensorBoard. 
You can use TensorBoard to compare between the different jobs that you ran.
Run TensorBoard with 
`--logdir dist:dist_model_dir,pipe:pipe_model_dir,file:normal_job_model_dir`