{ "cells": [ { "cell_type": "markdown", "metadata": {}, "source": [ "# Distributed training with horovod\n", "Horovod is a distributed training framework based on MPI. Horovod is only available with TensorFlow version 1.12 or newer. You can find more details at [Horovod README](https://github.com/uber/horovod).\n", "\n", "To enable Horovod, we need to make small changes to our script." ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Create a training script that support Horovod distributed training\n", "Create a copy of the script (training_script/cifar10_keras_sm.py, **not the pipe script**) and save it as training_script/cifar10_keras_dist.py.\n", "in:\n", "```python\n", "def main(args):\n", "```\n", "\n", "### Start horovod\n", "add horovod support using the following code:\n", "```python\n", " import horovod.keras as hvd\n", " hvd.init()\n", " config = tf.ConfigProto()\n", " config.gpu_options.allow_growth = True\n", " config.gpu_options.visible_device_list = str(hvd.local_rank())\n", " K.set_session(tf.Session(config=config))\n", "```\n", "\n", "### Configure callbacks\n", "add the following callbacks:\n", "```python\n", "hvdBroadcast = hvd.callbacks.BroadcastGlobalVariablesCallback(0)\n", "hvdMetricAverage = hvd.callbacks.MetricAverageCallback()\n", "hvdLearningRate = hvd.callbacks.LearningRateWarmupCallback(warmup_epochs=5, verbose=1)\n", "```\n", "\n", "change the checkpoint and tensorboard callback to run only on `hvd.rank() == o` (You want only a single process the send logs)\n", "```python\n", "callbacks = [hvdBroadcast,hvdMetricAverage,hvdLearningRate]\n", "if hvd.rank() == 0:\n", " callbacks.append(checkpoint)\n", " callbacks.append(tb_callback)\n", "```\n", "update model.fit to use the new callbacks list\n", "\n", "### Configure the optimizer\n", "in\n", "```python\n", "# Add hvd to the function. also add it in the function call\n", "def keras_model_fn(learning_rate, weight_decay, optimizer, momentum, hvd): \n", "```\n", "configure the horovod optimizer.\n", "Change `size=1` to `size=hvd.size()` \n", "\n", "add \n", "```python\n", "opt = hvd.DistributedOptimizer(opt)\n", "```\n", "before \n", "```python\n", " model.compile(loss='categorical_crossentropy',\n", " optimizer=opt,\n", " metrics=['accuracy'])\n", "```" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "## Run Distributed training\n", "To start a distributed training job with Horovod, configure the job distribution:\n", "```python\n", "distributions = {'mpi': {\n", " 'enabled': True,\n", " 'processes_per_host': # Number of Horovod processes per host\n", " }\n", " }\n", "```\n", "\n", "Run the same job using 2 ml.p3.2xlarge instances (processes_per_host:1). \n", "add the distributions configuration" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import os\n", "import sagemaker\n", "from sagemaker import get_execution_role\n", "\n", "sagemaker_session = sagemaker.Session()\n", "\n", "role = get_execution_role()" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Load the SageMaker experiment" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from smexperiments.experiment import Experiment\n", "from smexperiments.trial import Trial\n", "import time\n", "cifar10_experiment = Experiment.load(\n", " experiment_name=\"TensorFlow-cifar10-experiment\")" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "# create a new trial\n", "trial_name = f\"cifar10-training-job-distributed-{int(time.time())}\"\n", "trial = Trial.create(\n", " trial_name=trial_name, \n", " experiment_name=cifar10_experiment.experiment_name\n", ")" ] }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "# Configure the dataset location variable\n", "dataset_location = sagemaker_session.upload_data(path='data', key_prefix='data/DEMO-cifar10')\n", "display(dataset_location)" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": null, "outputs": [], "source": [ "metric_definitions = [\n", " {'Name': 'train:loss', 'Regex': 'loss: ([0-9\\\\.]+) - acc: [0-9\\\\.]+'},\n", " {'Name': 'train:accuracy', 'Regex': 'loss: [0-9\\\\.]+ - acc: ([0-9\\\\.]+)'},\n", " {'Name': 'validation:accuracy', 'Regex': 'val_loss: [0-9\\\\.]+ - val_acc: ([0-9\\\\.]+)'},\n", " {'Name': 'validation:loss', 'Regex': 'val_loss: ([0-9\\\\.]+) - val_acc: [0-9\\\\.]+'},\n", "]" ], "metadata": { "collapsed": false, "pycharm": { "name": "#%%\n" } } }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "from sagemaker.tensorflow import TensorFlow\n", "# Change base_job_name to 'cifar10-dist' for console visibility\n", "# Remember to configure distributions = ...\n", "estimator = ... " ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "Connect the trial configured above to the job. add the experiment config to the fit function.\n", "```python\n", "experiment_config={\n", " \"ExperimentName\": cifar10_experiment.experiment_name, \n", " \"TrialName\": trial.trial_name,\n", " \"TrialComponentDisplayName\": \"Training\"}\n", "```" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "estimator.fit({'train' : 'train_data_location',\n", " 'validation' : 'validation_data_location',\n", " 'eval' : 'eval_data_location'},\n", " experiment_config=)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "### Analyze the experiments" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "search_expression = {\n", " \"Filters\":[\n", " {\n", " \"Name\": \"DisplayName\",\n", " \"Operator\": \"Equals\",\n", " \"Value\": \"Training\",\n", " }\n", " ],\n", "}" ] }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], "source": [ "import pandas as pd \n", "pd.options.display.max_columns = 500\n", "\n", "from sagemaker.analytics import ExperimentAnalytics\n", "trial_component_analytics = ExperimentAnalytics(\n", " sagemaker_session=sagemaker_session, \n", " experiment_name=cifar10_experiment.experiment_name,\n", " search_expression=search_expression\n", ")\n", "\n", "table = trial_component_analytics.dataframe(force_refresh=True)\n", "display(table)" ] }, { "cell_type": "markdown", "metadata": {}, "source": [ "**Good job!** \n", "You can now use SageMaker training jobs for distributed jobs.\n", "Before continuing to the next notebook, look at the distribution job metrics from CloudWatch and TensorBoard. \n", "You can use TensorBoard to compare between the different jobs that you ran.\n", "Run TensorBoard with \n", "`--logdir dist:dist_model_dir,pipe:pipe_model_dir,file:normal_job_model_dir`" ] } ], "metadata": { "kernelspec": { "display_name": "conda_tensorflow_p36", "language": "python", "name": "conda_tensorflow_p36" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.6.5" }, "pycharm": { "stem_cell": { "cell_type": "raw", "source": [], "metadata": { "collapsed": false } } } }, "nbformat": 4, "nbformat_minor": 4 }