# Horovod Distributed Training with SageMaker TensorFlow script mode.

Horovod is a distributed training framework based on Message Passing Interfae (MPI). For information about Horovod, see [Horovod README](https://github.com/uber/horovod).

You can perform distributed training with Horovod on SageMaker by using the SageMaker Tensorflow container. If MPI is enabled when you create the training job, SageMaker creates the MPI environment and executes the `mpirun` command to execute the training script. Details on how to configure mpi settings in training job are described later in this example.

In this example notebook, we create a Horovod training job that uses the MNIST data set.

## Set up the environment

We get the `IAM` role that this notebook is running as and pass that role to the TensorFlow estimator that SageMaker uses to get data and perform training.

In [2]:
import sagemaker
import os
from sagemaker.utils import sagemaker_timestamp
from sagemaker.tensorflow import TensorFlow
from sagemaker import get_execution_role
import time

sagemaker_session = sagemaker.Session()

default_s3_bucket = sagemaker_session.default_bucket()
sagemaker_iam_role = get_execution_role()

train_script = "mnist_hvd.py"

## Prepare Data for training

Now we download the MNIST dataset to the local `/tmp/data/` directory and then upload it to an S3 bucket. After uploading the dataset to S3, we delete the data from `/tmp/data/`. 

In [3]:
import os
import shutil

import numpy as np

import keras
from keras.datasets import mnist
(x_train, y_train), (x_test, y_test) = mnist.load_data()

s3_train_path = "s3://{}/mnist/train.npz".format(default_s3_bucket)
s3_test_path = "s3://{}/mnist/test.npz".format(default_s3_bucket)

# Create local directory
! mkdir -p /tmp/data/mnist_train
! mkdir -p /tmp/data/mnist_test

# Save data locally
np.savez('/tmp/data/mnist_train/train.npz', data=x_train, labels=y_train)
np.savez('/tmp/data/mnist_test/test.npz', data=x_test, labels=y_test)

# Upload the dataset to s3
! aws s3 cp /tmp/data/mnist_train/train.npz $s3_train_path
! aws s3 cp /tmp/data/mnist_test/test.npz $s3_test_path

print('training data at ', s3_train_path)
print('test data at ', s3_test_path)
! rm -rf /tmp/data

Using TensorFlow backend.


Downloading data from https://s3.amazonaws.com/img-datasets/mnist.npz
upload: ../../../../../tmp/data/mnist_train/train.npz to s3://sagemaker-us-east-1-328296961357/mnist/train.npz
upload: ../../../../../tmp/data/mnist_test/test.npz to s3://sagemaker-us-east-1-328296961357/mnist/test.npz
training data at  s3://sagemaker-us-east-1-328296961357/mnist/train.npz
test data at  s3://sagemaker-us-east-1-328296961357/mnist/test.npz


## Write a script for horovod distributed training

This example is based on the [Keras MNIST horovod example](https://github.com/uber/horovod/blob/master/examples/keras_mnist.py) example in the horovod github repository.

To run this script we have to make following modifications:

### 1. Accept `--model_dir` as a command-line argument
Modify the script to accept `model_dir` as a command-line argument that defines the directory path (i.e. `/opt/ml/model/`) where the output model is saved. Because Sagemaker deletes the training cluster when training completes, saving the model to `/opt/ml/model/` directory prevents the trained model from getting lost, because when the training job completes, SageMaker writes the data stored in `/opt/ml/model/` to an S3 bucket. 

This also allows the SageMaker training job to integrate with other SageMaker services, such as hosted inference endpoints or batch transform jobs. It also allows you to host the trained model outside of SageMaker.

The following code adds `model_dir` as a command-line argument to the script:

```
parser = argparse.ArgumentParser()
parser.add_argument('--model_dir', type=str)
```

More details can be found [here](https://github.com/aws/sagemaker-containers/blob/master/README.rst).

### 2. Load train and test data

You can get local directory path where the `train` and `test` data is downloaded by reading the environment variable `SM_CHANNEL_TRAIN` and `SM_CHANNEL_TEST` respectively.
After you get the directory path, load the data into memory.

Here is the code:

```
x_train = np.load(os.path.join(os.environ['SM_CHANNEL_TRAIN'], 'train.npz'))['data']
y_train = np.load(os.path.join(os.environ['SM_CHANNEL_TRAIN'], 'train.npz'))['labels']

x_test = np.load(os.path.join(os.environ['SM_CHANNEL_TEST'], 'test.npz'))['data']
y_test = np.load(os.path.join(os.environ['SM_CHANNEL_TEST'], 'test.npz'))['labels']
```

For a list of all environment variables set by SageMaker that are accessible inside a training script, see [SageMaker Containers](https://github.com/aws/sagemaker-containers/blob/master/README.md).

### 3. Save the model only at the master node

Because in Horovod the training is distributed to multiple nodes, the model should only be saved by the master node. The following code in the script does this:

```
# Horovod: Save model only on worker 0 (i.e. master)
if hvd.rank() == 0:
    saved_model_path = tf.contrib.saved_model.save_keras_model(model, args.model_dir)
```

For more documentation on how to use SageMaker 'script' mode, please, refer to https://sagemaker.readthedocs.io/en/stable/using_tf.html

### Training script

Here is the final training script.

In [4]:
#!pygmentize 'mnist_hvd.py'

## Test locally using SageMaker Python SDK TensorFlow Estimator

You can use the SageMaker Python SDK TensorFlow estimator to easily train locally and in SageMaker.

This notebook shows how to use the SageMaker Python SDK to run your code in a local container before deploying to SageMaker's managed training or hosting environments. Just change your estimator's `train_instance_type` to `local` or `local_gpu`. For more information, see: https://github.com/aws/sagemaker-python-sdk#local-mode.

To use this feature, you need to install docker-compose (and nvidia-docker if you are training with a GPU). Run the following script to install docker-compose or nvidia-docker-compose, and configure the notebook environment for you.

**Note**: You can only run a single local notebook at a time.

In [5]:
!/bin/bash ./setup.sh

/bin/bash: ./setup.sh: No such file or directory


To train locally, set `train_instance_type` to `local`:

In [6]:
train_instance_type='local'
instance_count_local = 2

The MPI environment for Horovod can be configured by setting the following flags in the `mpi` field of the `distribution` dictionary that you pass to the TensorFlow estimator :

* ``enabled (bool)``: If set to ``True``, the MPI setup is performed and ``mpirun`` command is executed.
* ``processes_per_host (int) [Optional]``: Number of processes MPI should launch on each host. Note, this should not be greater than the available slots on the selected instance type. This flag should be set for the multi-cpu/gpu training.
* ``custom_mpi_options (str) [Optional]``: Any mpirun flag(s) can be passed in this field that will be added to the mpirun command executed by SageMaker to launch distributed horovod training.

For more information about the `distribution` dictionary, see the SageMaker Python SDK [README](https://github.com/aws/sagemaker-python-sdk/blob/v1.17.3/src/sagemaker/tensorflow/README.rst).

First, enable MPI:

In [7]:
distributions = {'mpi': {'enabled': True}}

Now, we create the Tensorflow estimator passing the `train_instance_type` and `distribution`

In [8]:
estimator_local = TensorFlow(entry_point=train_script,
                       role=sagemaker_iam_role,
                       train_instance_count=instance_count_local,
                       train_instance_type=train_instance_type,
                       script_mode=True,
                       framework_version='1.12',
                       py_version = 'py3',
                       distributions=distributions,
                       base_job_name='hvd-mnist-local')

Call `fit()` to start the local training 

In [9]:
#%%time
#To save time an not run in 'local' mode, comment out the next line 
#estimator_local.fit({"train":s3_train_path, "test":s3_test_path})

## Train in SageMaker

After you test the training job locally, run it on SageMaker:

First, change the instance type from `local` to the valid EC2 instance type. For example, `ml.c4.xlarge`.

In [10]:
#train_instance_type='ml.p2.xlarge' #1 K80 GPU
#train_instance_type='ml.p2.8xlarge' #8 K80 GPU
#train_instance_type='ml.c4.xlarge' #4 vCPU
#train_instance_type='ml.c4.4xlarge' #16 vCPU
#train_instance_type='ml.c5.4xlarge' #16 vCPU
#train_instance_type='ml.m4.4xlarge' #16 vCPU
train_instance_type='ml.m5.4xlarge' #16 vCPU
instance_count = 2

You can also provide your custom MPI options by passing in the `custom_mpi_options` field of `distribution` dictionary that will be added to the `mpirun` command executed by SageMaker:

In [11]:
distributions = {'mpi': {'enabled': True, "custom_mpi_options": "-verbose --NCCL_DEBUG=INFO"}}

Now, we create the Tensorflow estimator passing the `train_instance_type` and `distribution` to launch the training job in sagemaker.

In [12]:
estimator = TensorFlow(entry_point=train_script,
                       role=sagemaker_iam_role,
                       train_instance_count=instance_count,
                       train_instance_type=train_instance_type,
                       script_mode=True,
                       framework_version='1.12',
                       py_version = 'py3',
                       distributions=distributions,
                       base_job_name='hvd-mnist')

Call `fit()` to start the training

In [13]:
%%time
#print( "instance_type:", train_instance_type, "instance_count:", instance_count, "processes_per_host: 1")
#estimator.fit({"train":s3_train_path, "test":s3_test_path})

CPU times: user 3 µs, sys: 1 µs, total: 4 µs
Wall time: 7.15 µs


##  Horovod training in SageMaker using multiple CPU/GPU

To enable mulitiple CPUs or GPUs for horovod training, set the `processes_per_host` field in the `mpi` section of the `distribution` dictionary to the desired value of processes that will be executed per instance.

In [14]:
instance_count = 2
processes_per_host = 2
print( "instance_type:", train_instance_type, "instance_count:", instance_count, "processes_per_host:", processes_per_host)
distributions = {'mpi': {'enabled': True, 
                         "custom_mpi_options": "-verbose --NCCL_DEBUG=INFO -x OMPI_MCA_btl_vader_single_copy_mechanism=none", 
                         "processes_per_host": processes_per_host}}

instance_type: ml.m5.4xlarge instance_count: 2 processes_per_host: 2


Now, we create the Tensorflow estimator passing the `train_instance_type` and `distribution`

In [15]:
estimator = TensorFlow(entry_point=train_script,
                       role=sagemaker_iam_role,
                       train_instance_count=instance_count,
                       train_instance_type=train_instance_type,
                       script_mode=True,
                       framework_version='1.12', 
                       py_version = 'py3',
                       distributions=distributions,
                       base_job_name='hvd-mnist-multi-cpu')

Call `fit()` to start the training

In [16]:
%%time
#estimator.fit({"train":s3_train_path, "test":s3_test_path})
#print("train_instance_type - ",train_instance_type )

CPU times: user 3 µs, sys: 1e+03 ns, total: 4 µs
Wall time: 6.68 µs


## Improving horovod training performance on SageMaker

Performing Horovod training inside a VPC improves the network latency between nodes, leading to higher performance and stability of Horovod training jobs.

For a detailed explanation of how to configure a VPC for SageMaker training, see [Secure Training and Inference with VPC](https://github.com/aws/sagemaker-python-sdk#secure-training-and-inference-with-vpc).

### Setup VPC infrastructure
We will setup following resources as part of VPC stack:
* `VPC`: AWS Virtual private cloud with CIDR block.
* `Subnets`: Two subnets with the CIDR blocks `10.0.0.0/24` and `10.0.1.0/24`
* `Security Group`: Defining the open ingress and egress ports, such as TCP.
* `VpcEndpoint`: S3 Vpc endpoint allowing sagemaker's vpc cluster to dosenload data from S3.
* `Route Table`: Defining routes and is tied to subnets and VPC.

Complete cloud formation template for setting up the VPC stack can be seen [here](./vpc_infra_cfn.json).

In [17]:
print(sagemaker_iam_role)
import boto3
from botocore.exceptions import ClientError
from time import sleep

def create_vpn_infra(stack_name="hvdvpcstack"):
    cfn = boto3.client("cloudformation")

    cfn_template = open("vpc_infra_cfn.json", "r").read()
    
    try:
        vpn_stack = cfn.create_stack(StackName=(stack_name),
                                     TemplateBody=cfn_template)
    except ClientError as e:
        if e.response['Error']['Code'] == 'AlreadyExistsException':
            print("Stack: {} already exists, so skipping stack creation.".format(stack_name))
        else:
            print("Unexpected error: %s" % e)
            raise e

    describe_stack = cfn.describe_stacks(StackName=stack_name)["Stacks"][0]

    while describe_stack["StackStatus"] == "CREATE_IN_PROGRESS":
        describe_stack = cfn.describe_stacks(StackName=stack_name)["Stacks"][0]
        sleep(0.5)

    if describe_stack["StackStatus"] != "CREATE_COMPLETE":
        raise ValueError("Stack creation failed in state: {}".format(describe_stack["StackStatus"]))

    print("Stack: {} created successfully with status: {}".format(stack_name, describe_stack["StackStatus"]))

    subnets = []
    security_groups = []

    for output_field in describe_stack["Outputs"]:

        if output_field["OutputKey"] == "SecurityGroupId":
            security_groups.append(output_field["OutputValue"])
        if output_field["OutputKey"] == "Subnet1Id" or output_field["OutputKey"] == "Subnet2Id":
            subnets.append(output_field["OutputValue"])

    return subnets, security_groups


subnets, security_groups = create_vpn_infra()
print("Subnets: {}".format(subnets))
print("Security Groups: {}".format(security_groups))

arn:aws:iam::328296961357:role/service-role/AmazonSageMaker-ExecutionRole-20190607T071619
Stack: hvdvpcstack already exists, so skipping stack creation.
Stack: hvdvpcstack created successfully with status: CREATE_COMPLETE
Subnets: ['subnet-08f5e3e0c9bd5e6c5', 'subnet-016142b6b4ca01fd8']
Security Groups: ['sg-0c1ec9176925f2c8e']


### VPC training in SageMaker
Now, we create the Tensorflow estimator, passing the `train_instance_type` and `distribution`.
#### Copy and paste subnets and security group ('sg') from above to the estimator definition below

In [18]:
estimator = TensorFlow(entry_point=train_script,
                       role=sagemaker_iam_role,
                       train_instance_count=instance_count,
                       train_instance_type=train_instance_type,
                       script_mode=True,
                       framework_version='1.12',
                       py_version = 'py3',
                       distributions=distributions,
                       security_group_ids=security_groups,
                       subnets=subnets,
                       base_job_name='hvd-mnist-vpc')

Call `fit()` to start the training

In [None]:
%%time
estimator.fit({"train":s3_train_path, "test":s3_test_path})

After training is completed, you can host the saved model by using TensorFlow Serving on SageMaker. For an example that uses TensorFlow Serving, see [(https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/tensorflow_serving_container/tensorflow_serving_container.ipynb](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/tensorflow_serving_container/tensorflow_serving_container.ipynb).

You can also depoloy the endpoint into specific VPC/subnet with security_id as follows (we are going to reuse subnets and security group created above for training, but you can also deploy the endpoint into different ones.

In [20]:
host_instance_type = 'ml.m4.xlarge'
vpc_config = {'Subnets': subnets, 'SecurityGroupIds': security_groups}
model = estimator.create_model(vpc_config_override= vpc_config)
predictor = model.deploy(initial_instance_count=1, instance_type=host_instance_type,
                              endpoint_name = 'vpc-endpoint-1')

--------------------------------------------------------------------------------------------------!

Please, note that in SageMaker Console, the Subnets and SecurityGroupIds will be visible in model's network settings, however, they will not appear as Endpoint parameters. 

## Reference Links:
* [SageMaker Container MPI Support.](https://github.com/aws/sagemaker-containers/blob/master/src/sagemaker_containers/_mpi.py)
* [Horovod Official Documentation](https://github.com/uber/horovod)
* [SageMaker Tensorflow script mode example.](https://github.com/awslabs/amazon-sagemaker-examples/blob/master/sagemaker-python-sdk/tensorflow_script_mode_quickstart/tensorflow_script_mode_quickstart.ipynb)
* [SageMaker_Tensorflow_Serving_Predictor](https://sagemaker.readthedocs.io/en/stable/sagemaker.tensorflow.html#sagemaker.tensorflow.serving.Predictor)